1 #![warn(rust_2018_idioms)]
2 #![cfg(feature = "full")]
3
4 use futures::future::poll_fn;
5 use std::io;
6 use std::sync::Arc;
7 use tokio::{io::ReadBuf, net::UdpSocket};
8 use tokio_test::assert_ok;
9
10 const MSG: &[u8] = b"hello";
11 const MSG_LEN: usize = MSG.len();
12
13 #[tokio::test]
send_recv() -> std::io::Result<()>14 async fn send_recv() -> std::io::Result<()> {
15 let sender = UdpSocket::bind("127.0.0.1:0").await?;
16 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
17
18 sender.connect(receiver.local_addr()?).await?;
19 receiver.connect(sender.local_addr()?).await?;
20
21 sender.send(MSG).await?;
22
23 let mut recv_buf = [0u8; 32];
24 let len = receiver.recv(&mut recv_buf[..]).await?;
25
26 assert_eq!(&recv_buf[..len], MSG);
27 Ok(())
28 }
29
30 #[tokio::test]
send_recv_poll() -> std::io::Result<()>31 async fn send_recv_poll() -> std::io::Result<()> {
32 let sender = UdpSocket::bind("127.0.0.1:0").await?;
33 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
34
35 sender.connect(receiver.local_addr()?).await?;
36 receiver.connect(sender.local_addr()?).await?;
37
38 poll_fn(|cx| sender.poll_send(cx, MSG)).await?;
39
40 let mut recv_buf = [0u8; 32];
41 let mut read = ReadBuf::new(&mut recv_buf);
42 let _len = poll_fn(|cx| receiver.poll_recv(cx, &mut read)).await?;
43
44 assert_eq!(read.filled(), MSG);
45 Ok(())
46 }
47
48 #[tokio::test]
send_to_recv_from() -> std::io::Result<()>49 async fn send_to_recv_from() -> std::io::Result<()> {
50 let sender = UdpSocket::bind("127.0.0.1:0").await?;
51 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
52
53 let receiver_addr = receiver.local_addr()?;
54 sender.send_to(MSG, &receiver_addr).await?;
55
56 let mut recv_buf = [0u8; 32];
57 let (len, addr) = receiver.recv_from(&mut recv_buf[..]).await?;
58
59 assert_eq!(&recv_buf[..len], MSG);
60 assert_eq!(addr, sender.local_addr()?);
61 Ok(())
62 }
63
64 #[tokio::test]
send_to_recv_from_poll() -> std::io::Result<()>65 async fn send_to_recv_from_poll() -> std::io::Result<()> {
66 let sender = UdpSocket::bind("127.0.0.1:0").await?;
67 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
68
69 let receiver_addr = receiver.local_addr()?;
70 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
71
72 let mut recv_buf = [0u8; 32];
73 let mut read = ReadBuf::new(&mut recv_buf);
74 let addr = poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
75
76 assert_eq!(read.filled(), MSG);
77 assert_eq!(addr, sender.local_addr()?);
78 Ok(())
79 }
80
81 #[tokio::test]
send_to_peek_from() -> std::io::Result<()>82 async fn send_to_peek_from() -> std::io::Result<()> {
83 let sender = UdpSocket::bind("127.0.0.1:0").await?;
84 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
85
86 let receiver_addr = receiver.local_addr()?;
87 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
88
89 // peek
90 let mut recv_buf = [0u8; 32];
91 let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
92 assert_eq!(&recv_buf[..n], MSG);
93 assert_eq!(addr, sender.local_addr()?);
94
95 // peek
96 let mut recv_buf = [0u8; 32];
97 let (n, addr) = receiver.peek_from(&mut recv_buf).await?;
98 assert_eq!(&recv_buf[..n], MSG);
99 assert_eq!(addr, sender.local_addr()?);
100
101 let mut recv_buf = [0u8; 32];
102 let (n, addr) = receiver.recv_from(&mut recv_buf).await?;
103 assert_eq!(&recv_buf[..n], MSG);
104 assert_eq!(addr, sender.local_addr()?);
105
106 Ok(())
107 }
108
109 #[tokio::test]
send_to_peek_from_poll() -> std::io::Result<()>110 async fn send_to_peek_from_poll() -> std::io::Result<()> {
111 let sender = UdpSocket::bind("127.0.0.1:0").await?;
112 let receiver = UdpSocket::bind("127.0.0.1:0").await?;
113
114 let receiver_addr = receiver.local_addr()?;
115 poll_fn(|cx| sender.poll_send_to(cx, MSG, receiver_addr)).await?;
116
117 let mut recv_buf = [0u8; 32];
118 let mut read = ReadBuf::new(&mut recv_buf);
119 let addr = poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
120
121 assert_eq!(read.filled(), MSG);
122 assert_eq!(addr, sender.local_addr()?);
123
124 let mut recv_buf = [0u8; 32];
125 let mut read = ReadBuf::new(&mut recv_buf);
126 poll_fn(|cx| receiver.poll_peek_from(cx, &mut read)).await?;
127
128 assert_eq!(read.filled(), MSG);
129 let mut recv_buf = [0u8; 32];
130 let mut read = ReadBuf::new(&mut recv_buf);
131
132 poll_fn(|cx| receiver.poll_recv_from(cx, &mut read)).await?;
133 assert_eq!(read.filled(), MSG);
134 Ok(())
135 }
136
137 #[tokio::test]
split() -> std::io::Result<()>138 async fn split() -> std::io::Result<()> {
139 let socket = UdpSocket::bind("127.0.0.1:0").await?;
140 let s = Arc::new(socket);
141 let r = s.clone();
142
143 let addr = s.local_addr()?;
144 tokio::spawn(async move {
145 s.send_to(MSG, &addr).await.unwrap();
146 });
147 let mut recv_buf = [0u8; 32];
148 let (len, _) = r.recv_from(&mut recv_buf[..]).await?;
149 assert_eq!(&recv_buf[..len], MSG);
150 Ok(())
151 }
152
153 #[tokio::test]
split_chan() -> std::io::Result<()>154 async fn split_chan() -> std::io::Result<()> {
155 // setup UdpSocket that will echo all sent items
156 let socket = UdpSocket::bind("127.0.0.1:0").await?;
157 let addr = socket.local_addr().unwrap();
158 let s = Arc::new(socket);
159 let r = s.clone();
160
161 let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
162 tokio::spawn(async move {
163 while let Some((bytes, addr)) = rx.recv().await {
164 s.send_to(&bytes, &addr).await.unwrap();
165 }
166 });
167
168 tokio::spawn(async move {
169 let mut buf = [0u8; 32];
170 loop {
171 let (len, addr) = r.recv_from(&mut buf).await.unwrap();
172 tx.send((buf[..len].to_vec(), addr)).await.unwrap();
173 }
174 });
175
176 // test that we can send a value and get back some response
177 let sender = UdpSocket::bind("127.0.0.1:0").await?;
178 sender.send_to(MSG, addr).await?;
179 let mut recv_buf = [0u8; 32];
180 let (len, _) = sender.recv_from(&mut recv_buf).await?;
181 assert_eq!(&recv_buf[..len], MSG);
182 Ok(())
183 }
184
185 #[tokio::test]
split_chan_poll() -> std::io::Result<()>186 async fn split_chan_poll() -> std::io::Result<()> {
187 // setup UdpSocket that will echo all sent items
188 let socket = UdpSocket::bind("127.0.0.1:0").await?;
189 let addr = socket.local_addr().unwrap();
190 let s = Arc::new(socket);
191 let r = s.clone();
192
193 let (tx, mut rx) = tokio::sync::mpsc::channel::<(Vec<u8>, std::net::SocketAddr)>(1_000);
194 tokio::spawn(async move {
195 while let Some((bytes, addr)) = rx.recv().await {
196 poll_fn(|cx| s.poll_send_to(cx, &bytes, addr))
197 .await
198 .unwrap();
199 }
200 });
201
202 tokio::spawn(async move {
203 let mut recv_buf = [0u8; 32];
204 let mut read = ReadBuf::new(&mut recv_buf);
205 loop {
206 let addr = poll_fn(|cx| r.poll_recv_from(cx, &mut read)).await.unwrap();
207 tx.send((read.filled().to_vec(), addr)).await.unwrap();
208 }
209 });
210
211 // test that we can send a value and get back some response
212 let sender = UdpSocket::bind("127.0.0.1:0").await?;
213 poll_fn(|cx| sender.poll_send_to(cx, MSG, addr)).await?;
214
215 let mut recv_buf = [0u8; 32];
216 let mut read = ReadBuf::new(&mut recv_buf);
217 let _ = poll_fn(|cx| sender.poll_recv_from(cx, &mut read)).await?;
218 assert_eq!(read.filled(), MSG);
219 Ok(())
220 }
221
222 // # Note
223 //
224 // This test is purposely written such that each time `sender` sends data on
225 // the socket, `receiver` awaits the data. On Unix, it would be okay waiting
226 // until the end of the test to receive all the data. On Windows, this would
227 // **not** be okay because it's resources are completion based (via IOCP).
228 // If data is sent and not yet received, attempting to send more data will
229 // result in `ErrorKind::WouldBlock` until the first operation completes.
230 #[tokio::test]
try_send_spawn()231 async fn try_send_spawn() {
232 const MSG2: &[u8] = b"world!";
233 const MSG2_LEN: usize = MSG2.len();
234
235 let sender = UdpSocket::bind("127.0.0.1:0").await.unwrap();
236 let receiver = UdpSocket::bind("127.0.0.1:0").await.unwrap();
237
238 receiver
239 .connect(sender.local_addr().unwrap())
240 .await
241 .unwrap();
242
243 sender.writable().await.unwrap();
244
245 let sent = &sender
246 .try_send_to(MSG, receiver.local_addr().unwrap())
247 .unwrap();
248 assert_eq!(sent, &MSG_LEN);
249 let mut buf = [0u8; 32];
250 let mut received = receiver.recv(&mut buf[..]).await.unwrap();
251
252 sender
253 .connect(receiver.local_addr().unwrap())
254 .await
255 .unwrap();
256 let sent = &sender.try_send(MSG2).unwrap();
257 assert_eq!(sent, &MSG2_LEN);
258 received += receiver.recv(&mut buf[..]).await.unwrap();
259
260 std::thread::spawn(move || {
261 let sent = &sender.try_send(MSG).unwrap();
262 assert_eq!(sent, &MSG_LEN);
263 })
264 .join()
265 .unwrap();
266 received += receiver.recv(&mut buf[..]).await.unwrap();
267
268 assert_eq!(received, MSG_LEN * 2 + MSG2_LEN);
269 }
270
271 #[tokio::test]
try_send_recv()272 async fn try_send_recv() {
273 // Create listener
274 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
275
276 // Create socket pair
277 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
278
279 // Connect the two
280 client.connect(server.local_addr().unwrap()).await.unwrap();
281 server.connect(client.local_addr().unwrap()).await.unwrap();
282
283 for _ in 0..5 {
284 loop {
285 client.writable().await.unwrap();
286
287 match client.try_send(b"hello world") {
288 Ok(n) => {
289 assert_eq!(n, 11);
290 break;
291 }
292 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
293 Err(e) => panic!("{:?}", e),
294 }
295 }
296
297 loop {
298 server.readable().await.unwrap();
299
300 let mut buf = [0; 512];
301
302 match server.try_recv(&mut buf) {
303 Ok(n) => {
304 assert_eq!(n, 11);
305 assert_eq!(&buf[0..11], &b"hello world"[..]);
306 break;
307 }
308 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
309 Err(e) => panic!("{:?}", e),
310 }
311 }
312 }
313 }
314
315 #[tokio::test]
try_send_to_recv_from()316 async fn try_send_to_recv_from() {
317 // Create listener
318 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
319 let saddr = server.local_addr().unwrap();
320
321 // Create socket pair
322 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
323 let caddr = client.local_addr().unwrap();
324
325 for _ in 0..5 {
326 loop {
327 client.writable().await.unwrap();
328
329 match client.try_send_to(b"hello world", saddr) {
330 Ok(n) => {
331 assert_eq!(n, 11);
332 break;
333 }
334 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
335 Err(e) => panic!("{:?}", e),
336 }
337 }
338
339 loop {
340 server.readable().await.unwrap();
341
342 let mut buf = [0; 512];
343
344 match server.try_recv_from(&mut buf) {
345 Ok((n, addr)) => {
346 assert_eq!(n, 11);
347 assert_eq!(addr, caddr);
348 assert_eq!(&buf[0..11], &b"hello world"[..]);
349 break;
350 }
351 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
352 Err(e) => panic!("{:?}", e),
353 }
354 }
355 }
356 }
357
358 #[tokio::test]
try_recv_buf()359 async fn try_recv_buf() {
360 // Create listener
361 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
362
363 // Create socket pair
364 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
365
366 // Connect the two
367 client.connect(server.local_addr().unwrap()).await.unwrap();
368 server.connect(client.local_addr().unwrap()).await.unwrap();
369
370 for _ in 0..5 {
371 loop {
372 client.writable().await.unwrap();
373
374 match client.try_send(b"hello world") {
375 Ok(n) => {
376 assert_eq!(n, 11);
377 break;
378 }
379 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
380 Err(e) => panic!("{:?}", e),
381 }
382 }
383
384 loop {
385 server.readable().await.unwrap();
386
387 let mut buf = Vec::with_capacity(512);
388
389 match server.try_recv_buf(&mut buf) {
390 Ok(n) => {
391 assert_eq!(n, 11);
392 assert_eq!(&buf[0..11], &b"hello world"[..]);
393 break;
394 }
395 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
396 Err(e) => panic!("{:?}", e),
397 }
398 }
399 }
400 }
401
402 #[tokio::test]
try_recv_buf_from()403 async fn try_recv_buf_from() {
404 // Create listener
405 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
406 let saddr = server.local_addr().unwrap();
407
408 // Create socket pair
409 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
410 let caddr = client.local_addr().unwrap();
411
412 for _ in 0..5 {
413 loop {
414 client.writable().await.unwrap();
415
416 match client.try_send_to(b"hello world", saddr) {
417 Ok(n) => {
418 assert_eq!(n, 11);
419 break;
420 }
421 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
422 Err(e) => panic!("{:?}", e),
423 }
424 }
425
426 loop {
427 server.readable().await.unwrap();
428
429 let mut buf = Vec::with_capacity(512);
430
431 match server.try_recv_buf_from(&mut buf) {
432 Ok((n, addr)) => {
433 assert_eq!(n, 11);
434 assert_eq!(addr, caddr);
435 assert_eq!(&buf[0..11], &b"hello world"[..]);
436 break;
437 }
438 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
439 Err(e) => panic!("{:?}", e),
440 }
441 }
442 }
443 }
444
445 #[tokio::test]
poll_ready()446 async fn poll_ready() {
447 // Create listener
448 let server = UdpSocket::bind("127.0.0.1:0").await.unwrap();
449 let saddr = server.local_addr().unwrap();
450
451 // Create socket pair
452 let client = UdpSocket::bind("127.0.0.1:0").await.unwrap();
453 let caddr = client.local_addr().unwrap();
454
455 for _ in 0..5 {
456 loop {
457 assert_ok!(poll_fn(|cx| client.poll_send_ready(cx)).await);
458
459 match client.try_send_to(b"hello world", saddr) {
460 Ok(n) => {
461 assert_eq!(n, 11);
462 break;
463 }
464 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
465 Err(e) => panic!("{:?}", e),
466 }
467 }
468
469 loop {
470 assert_ok!(poll_fn(|cx| server.poll_recv_ready(cx)).await);
471
472 let mut buf = Vec::with_capacity(512);
473
474 match server.try_recv_buf_from(&mut buf) {
475 Ok((n, addr)) => {
476 assert_eq!(n, 11);
477 assert_eq!(addr, caddr);
478 assert_eq!(&buf[0..11], &b"hello world"[..]);
479 break;
480 }
481 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue,
482 Err(e) => panic!("{:?}", e),
483 }
484 }
485 }
486 }
487