1 use crate::io::{Interest, PollEvented, ReadBuf, Ready}; 2 use crate::net::{to_socket_addrs, ToSocketAddrs}; 3 4 use std::convert::TryFrom; 5 use std::fmt; 6 use std::io; 7 use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr}; 8 use std::task::{Context, Poll}; 9 10 cfg_io_util! { 11 use bytes::BufMut; 12 } 13 14 cfg_net! { 15 /// A UDP socket. 16 /// 17 /// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket` 18 /// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`: 19 /// 20 /// * one to many: [`bind`](`UdpSocket::bind`) and use [`send_to`](`UdpSocket::send_to`) 21 /// and [`recv_from`](`UdpSocket::recv_from`) to communicate with many different addresses 22 /// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`send`](`UdpSocket::send`) 23 /// and [`recv`](`UdpSocket::recv`) to communicate only with that remote address 24 /// 25 /// This type does not provide a `split` method, because this functionality 26 /// can be achieved by instead wrapping the socket in an [`Arc`]. Note that 27 /// you do not need a `Mutex` to share the `UdpSocket` — an `Arc<UdpSocket>` 28 /// is enough. This is because all of the methods take `&self` instead of 29 /// `&mut self`. Once you have wrapped it in an `Arc`, you can call 30 /// `.clone()` on the `Arc<UdpSocket>` to get multiple shared handles to the 31 /// same socket. An example of such usage can be found further down. 32 /// 33 /// [`Arc`]: std::sync::Arc 34 /// 35 /// # Streams 36 /// 37 /// If you need to listen over UDP and produce a [`Stream`], you can look 38 /// at [`UdpFramed`]. 39 /// 40 /// [`UdpFramed`]: https://docs.rs/tokio-util/latest/tokio_util/udp/struct.UdpFramed.html 41 /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html 42 /// 43 /// # Example: one to many (bind) 44 /// 45 /// Using `bind` we can create a simple echo server that sends and recv's with many different clients: 46 /// ```no_run 47 /// use tokio::net::UdpSocket; 48 /// use std::io; 49 /// 50 /// #[tokio::main] 51 /// async fn main() -> io::Result<()> { 52 /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; 53 /// let mut buf = [0; 1024]; 54 /// loop { 55 /// let (len, addr) = sock.recv_from(&mut buf).await?; 56 /// println!("{:?} bytes received from {:?}", len, addr); 57 /// 58 /// let len = sock.send_to(&buf[..len], addr).await?; 59 /// println!("{:?} bytes sent", len); 60 /// } 61 /// } 62 /// ``` 63 /// 64 /// # Example: one to one (connect) 65 /// 66 /// Or using `connect` we can echo with a single remote address using `send` and `recv`: 67 /// ```no_run 68 /// use tokio::net::UdpSocket; 69 /// use std::io; 70 /// 71 /// #[tokio::main] 72 /// async fn main() -> io::Result<()> { 73 /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; 74 /// 75 /// let remote_addr = "127.0.0.1:59611"; 76 /// sock.connect(remote_addr).await?; 77 /// let mut buf = [0; 1024]; 78 /// loop { 79 /// let len = sock.recv(&mut buf).await?; 80 /// println!("{:?} bytes received from {:?}", len, remote_addr); 81 /// 82 /// let len = sock.send(&buf[..len]).await?; 83 /// println!("{:?} bytes sent", len); 84 /// } 85 /// } 86 /// ``` 87 /// 88 /// # Example: Splitting with `Arc` 89 /// 90 /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright 91 /// to use an `Arc<UdpSocket>` and share the references to multiple tasks. 92 /// Here is a similar "echo" example that supports concurrent 93 /// sending/receiving: 94 /// 95 /// ```no_run 96 /// use tokio::{net::UdpSocket, sync::mpsc}; 97 /// use std::{io, net::SocketAddr, sync::Arc}; 98 /// 99 /// #[tokio::main] 100 /// async fn main() -> io::Result<()> { 101 /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?; 102 /// let r = Arc::new(sock); 103 /// let s = r.clone(); 104 /// let (tx, mut rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(1_000); 105 /// 106 /// tokio::spawn(async move { 107 /// while let Some((bytes, addr)) = rx.recv().await { 108 /// let len = s.send_to(&bytes, &addr).await.unwrap(); 109 /// println!("{:?} bytes sent", len); 110 /// } 111 /// }); 112 /// 113 /// let mut buf = [0; 1024]; 114 /// loop { 115 /// let (len, addr) = r.recv_from(&mut buf).await?; 116 /// println!("{:?} bytes received from {:?}", len, addr); 117 /// tx.send((buf[..len].to_vec(), addr)).await.unwrap(); 118 /// } 119 /// } 120 /// ``` 121 /// 122 pub struct UdpSocket { 123 io: PollEvented<mio::net::UdpSocket>, 124 } 125 } 126 127 impl UdpSocket { 128 /// This function will create a new UDP socket and attempt to bind it to 129 /// the `addr` provided. 130 /// 131 /// # Example 132 /// 133 /// ```no_run 134 /// use tokio::net::UdpSocket; 135 /// use std::io; 136 /// 137 /// #[tokio::main] 138 /// async fn main() -> io::Result<()> { 139 /// let sock = UdpSocket::bind("0.0.0.0:8080").await?; 140 /// // use `sock` 141 /// # let _ = sock; 142 /// Ok(()) 143 /// } 144 /// ``` bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket>145 pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket> { 146 let addrs = to_socket_addrs(addr).await?; 147 let mut last_err = None; 148 149 for addr in addrs { 150 match UdpSocket::bind_addr(addr) { 151 Ok(socket) => return Ok(socket), 152 Err(e) => last_err = Some(e), 153 } 154 } 155 156 Err(last_err.unwrap_or_else(|| { 157 io::Error::new( 158 io::ErrorKind::InvalidInput, 159 "could not resolve to any address", 160 ) 161 })) 162 } 163 bind_addr(addr: SocketAddr) -> io::Result<UdpSocket>164 fn bind_addr(addr: SocketAddr) -> io::Result<UdpSocket> { 165 let sys = mio::net::UdpSocket::bind(addr)?; 166 UdpSocket::new(sys) 167 } 168 new(socket: mio::net::UdpSocket) -> io::Result<UdpSocket>169 fn new(socket: mio::net::UdpSocket) -> io::Result<UdpSocket> { 170 let io = PollEvented::new(socket)?; 171 Ok(UdpSocket { io }) 172 } 173 174 /// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`. 175 /// 176 /// This function is intended to be used to wrap a UDP socket from the 177 /// standard library in the Tokio equivalent. The conversion assumes nothing 178 /// about the underlying socket; it is left up to the user to set it in 179 /// non-blocking mode. 180 /// 181 /// This can be used in conjunction with socket2's `Socket` interface to 182 /// configure a socket before it's handed off, such as setting options like 183 /// `reuse_address` or binding to multiple addresses. 184 /// 185 /// # Panics 186 /// 187 /// This function panics if thread-local runtime is not set. 188 /// 189 /// The runtime is usually set implicitly when this function is called 190 /// from a future driven by a tokio runtime, otherwise runtime can be set 191 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. 192 /// 193 /// # Example 194 /// 195 /// ```no_run 196 /// use tokio::net::UdpSocket; 197 /// # use std::{io, net::SocketAddr}; 198 /// 199 /// # #[tokio::main] 200 /// # async fn main() -> io::Result<()> { 201 /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap(); 202 /// let std_sock = std::net::UdpSocket::bind(addr)?; 203 /// std_sock.set_nonblocking(true)?; 204 /// let sock = UdpSocket::from_std(std_sock)?; 205 /// // use `sock` 206 /// # Ok(()) 207 /// # } 208 /// ``` from_std(socket: net::UdpSocket) -> io::Result<UdpSocket>209 pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> { 210 let io = mio::net::UdpSocket::from_std(socket); 211 UdpSocket::new(io) 212 } 213 214 /// Turns a [`tokio::net::UdpSocket`] into a [`std::net::UdpSocket`]. 215 /// 216 /// The returned [`std::net::UdpSocket`] will have nonblocking mode set as 217 /// `true`. Use [`set_nonblocking`] to change the blocking mode if needed. 218 /// 219 /// # Examples 220 /// 221 /// ```rust,no_run 222 /// use std::error::Error; 223 /// 224 /// #[tokio::main] 225 /// async fn main() -> Result<(), Box<dyn Error>> { 226 /// let tokio_socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await?; 227 /// let std_socket = tokio_socket.into_std()?; 228 /// std_socket.set_nonblocking(false)?; 229 /// Ok(()) 230 /// } 231 /// ``` 232 /// 233 /// [`tokio::net::UdpSocket`]: UdpSocket 234 /// [`std::net::UdpSocket`]: std::net::UdpSocket 235 /// [`set_nonblocking`]: fn@std::net::UdpSocket::set_nonblocking into_std(self) -> io::Result<std::net::UdpSocket>236 pub fn into_std(self) -> io::Result<std::net::UdpSocket> { 237 #[cfg(unix)] 238 { 239 use std::os::unix::io::{FromRawFd, IntoRawFd}; 240 self.io 241 .into_inner() 242 .map(|io| io.into_raw_fd()) 243 .map(|raw_fd| unsafe { std::net::UdpSocket::from_raw_fd(raw_fd) }) 244 } 245 246 #[cfg(windows)] 247 { 248 use std::os::windows::io::{FromRawSocket, IntoRawSocket}; 249 self.io 250 .into_inner() 251 .map(|io| io.into_raw_socket()) 252 .map(|raw_socket| unsafe { std::net::UdpSocket::from_raw_socket(raw_socket) }) 253 } 254 } 255 256 /// Returns the local address that this socket is bound to. 257 /// 258 /// # Example 259 /// 260 /// ```no_run 261 /// use tokio::net::UdpSocket; 262 /// # use std::{io, net::SocketAddr}; 263 /// 264 /// # #[tokio::main] 265 /// # async fn main() -> io::Result<()> { 266 /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap(); 267 /// let sock = UdpSocket::bind(addr).await?; 268 /// // the address the socket is bound to 269 /// let local_addr = sock.local_addr()?; 270 /// # Ok(()) 271 /// # } 272 /// ``` local_addr(&self) -> io::Result<SocketAddr>273 pub fn local_addr(&self) -> io::Result<SocketAddr> { 274 self.io.local_addr() 275 } 276 277 /// Connects the UDP socket setting the default destination for send() and 278 /// limiting packets that are read via recv from the address specified in 279 /// `addr`. 280 /// 281 /// # Example 282 /// 283 /// ```no_run 284 /// use tokio::net::UdpSocket; 285 /// # use std::{io, net::SocketAddr}; 286 /// 287 /// # #[tokio::main] 288 /// # async fn main() -> io::Result<()> { 289 /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?; 290 /// 291 /// let remote_addr = "127.0.0.1:59600".parse::<SocketAddr>().unwrap(); 292 /// sock.connect(remote_addr).await?; 293 /// let mut buf = [0u8; 32]; 294 /// // recv from remote_addr 295 /// let len = sock.recv(&mut buf).await?; 296 /// // send to remote_addr 297 /// let _len = sock.send(&buf[..len]).await?; 298 /// # Ok(()) 299 /// # } 300 /// ``` connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<()>301 pub async fn connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<()> { 302 let addrs = to_socket_addrs(addr).await?; 303 let mut last_err = None; 304 305 for addr in addrs { 306 match self.io.connect(addr) { 307 Ok(_) => return Ok(()), 308 Err(e) => last_err = Some(e), 309 } 310 } 311 312 Err(last_err.unwrap_or_else(|| { 313 io::Error::new( 314 io::ErrorKind::InvalidInput, 315 "could not resolve to any address", 316 ) 317 })) 318 } 319 320 /// Waits for any of the requested ready states. 321 /// 322 /// This function is usually paired with `try_recv()` or `try_send()`. It 323 /// can be used to concurrently recv / send to the same socket on a single 324 /// task without splitting the socket. 325 /// 326 /// The function may complete without the socket being ready. This is a 327 /// false-positive and attempting an operation will return with 328 /// `io::ErrorKind::WouldBlock`. 329 /// 330 /// # Cancel safety 331 /// 332 /// This method is cancel safe. Once a readiness event occurs, the method 333 /// will continue to return immediately until the readiness event is 334 /// consumed by an attempt to read or write that fails with `WouldBlock` or 335 /// `Poll::Pending`. 336 /// 337 /// # Examples 338 /// 339 /// Concurrently receive from and send to the socket on the same task 340 /// without splitting. 341 /// 342 /// ```no_run 343 /// use tokio::io::{self, Interest}; 344 /// use tokio::net::UdpSocket; 345 /// 346 /// #[tokio::main] 347 /// async fn main() -> io::Result<()> { 348 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 349 /// socket.connect("127.0.0.1:8081").await?; 350 /// 351 /// loop { 352 /// let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?; 353 /// 354 /// if ready.is_readable() { 355 /// // The buffer is **not** included in the async task and will only exist 356 /// // on the stack. 357 /// let mut data = [0; 1024]; 358 /// match socket.try_recv(&mut data[..]) { 359 /// Ok(n) => { 360 /// println!("received {:?}", &data[..n]); 361 /// } 362 /// // False-positive, continue 363 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} 364 /// Err(e) => { 365 /// return Err(e); 366 /// } 367 /// } 368 /// } 369 /// 370 /// if ready.is_writable() { 371 /// // Write some data 372 /// match socket.try_send(b"hello world") { 373 /// Ok(n) => { 374 /// println!("sent {} bytes", n); 375 /// } 376 /// // False-positive, continue 377 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {} 378 /// Err(e) => { 379 /// return Err(e); 380 /// } 381 /// } 382 /// } 383 /// } 384 /// } 385 /// ``` ready(&self, interest: Interest) -> io::Result<Ready>386 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { 387 let event = self.io.registration().readiness(interest).await?; 388 Ok(event.ready) 389 } 390 391 /// Waits for the socket to become writable. 392 /// 393 /// This function is equivalent to `ready(Interest::WRITABLE)` and is 394 /// usually paired with `try_send()` or `try_send_to()`. 395 /// 396 /// The function may complete without the socket being writable. This is a 397 /// false-positive and attempting a `try_send()` will return with 398 /// `io::ErrorKind::WouldBlock`. 399 /// 400 /// # Cancel safety 401 /// 402 /// This method is cancel safe. Once a readiness event occurs, the method 403 /// will continue to return immediately until the readiness event is 404 /// consumed by an attempt to write that fails with `WouldBlock` or 405 /// `Poll::Pending`. 406 /// 407 /// # Examples 408 /// 409 /// ```no_run 410 /// use tokio::net::UdpSocket; 411 /// use std::io; 412 /// 413 /// #[tokio::main] 414 /// async fn main() -> io::Result<()> { 415 /// // Bind socket 416 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 417 /// socket.connect("127.0.0.1:8081").await?; 418 /// 419 /// loop { 420 /// // Wait for the socket to be writable 421 /// socket.writable().await?; 422 /// 423 /// // Try to send data, this may still fail with `WouldBlock` 424 /// // if the readiness event is a false positive. 425 /// match socket.try_send(b"hello world") { 426 /// Ok(n) => { 427 /// break; 428 /// } 429 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 430 /// continue; 431 /// } 432 /// Err(e) => { 433 /// return Err(e); 434 /// } 435 /// } 436 /// } 437 /// 438 /// Ok(()) 439 /// } 440 /// ``` writable(&self) -> io::Result<()>441 pub async fn writable(&self) -> io::Result<()> { 442 self.ready(Interest::WRITABLE).await?; 443 Ok(()) 444 } 445 446 /// Polls for write/send readiness. 447 /// 448 /// If the udp stream is not currently ready for sending, this method will 449 /// store a clone of the `Waker` from the provided `Context`. When the udp 450 /// stream becomes ready for sending, `Waker::wake` will be called on the 451 /// waker. 452 /// 453 /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only 454 /// the `Waker` from the `Context` passed to the most recent call is 455 /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a 456 /// second, independent waker.) 457 /// 458 /// This function is intended for cases where creating and pinning a future 459 /// via [`writable`] is not feasible. Where possible, using [`writable`] is 460 /// preferred, as this supports polling from multiple tasks at once. 461 /// 462 /// # Return value 463 /// 464 /// The function returns: 465 /// 466 /// * `Poll::Pending` if the udp stream is not ready for writing. 467 /// * `Poll::Ready(Ok(()))` if the udp stream is ready for writing. 468 /// * `Poll::Ready(Err(e))` if an error is encountered. 469 /// 470 /// # Errors 471 /// 472 /// This function may encounter any standard I/O error except `WouldBlock`. 473 /// 474 /// [`writable`]: method@Self::writable poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>475 pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 476 self.io.registration().poll_write_ready(cx).map_ok(|_| ()) 477 } 478 479 /// Sends data on the socket to the remote address that the socket is 480 /// connected to. 481 /// 482 /// The [`connect`] method will connect this socket to a remote address. 483 /// This method will fail if the socket is not connected. 484 /// 485 /// [`connect`]: method@Self::connect 486 /// 487 /// # Return 488 /// 489 /// On success, the number of bytes sent is returned, otherwise, the 490 /// encountered error is returned. 491 /// 492 /// # Cancel safety 493 /// 494 /// This method is cancel safe. If `send` is used as the event in a 495 /// [`tokio::select!`](crate::select) statement and some other branch 496 /// completes first, then it is guaranteed that the message was not sent. 497 /// 498 /// # Examples 499 /// 500 /// ```no_run 501 /// use tokio::io; 502 /// use tokio::net::UdpSocket; 503 /// 504 /// #[tokio::main] 505 /// async fn main() -> io::Result<()> { 506 /// // Bind socket 507 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 508 /// socket.connect("127.0.0.1:8081").await?; 509 /// 510 /// // Send a message 511 /// socket.send(b"hello world").await?; 512 /// 513 /// Ok(()) 514 /// } 515 /// ``` send(&self, buf: &[u8]) -> io::Result<usize>516 pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { 517 self.io 518 .registration() 519 .async_io(Interest::WRITABLE, || self.io.send(buf)) 520 .await 521 } 522 523 /// Attempts to send data on the socket to the remote address to which it 524 /// was previously `connect`ed. 525 /// 526 /// The [`connect`] method will connect this socket to a remote address. 527 /// This method will fail if the socket is not connected. 528 /// 529 /// Note that on multiple calls to a `poll_*` method in the send direction, 530 /// only the `Waker` from the `Context` passed to the most recent call will 531 /// be scheduled to receive a wakeup. 532 /// 533 /// # Return value 534 /// 535 /// The function returns: 536 /// 537 /// * `Poll::Pending` if the socket is not available to write 538 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent 539 /// * `Poll::Ready(Err(e))` if an error is encountered. 540 /// 541 /// # Errors 542 /// 543 /// This function may encounter any standard I/O error except `WouldBlock`. 544 /// 545 /// [`connect`]: method@Self::connect poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>546 pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> { 547 self.io 548 .registration() 549 .poll_write_io(cx, || self.io.send(buf)) 550 } 551 552 /// Tries to send data on the socket to the remote address to which it is 553 /// connected. 554 /// 555 /// When the socket buffer is full, `Err(io::ErrorKind::WouldBlock)` is 556 /// returned. This function is usually paired with `writable()`. 557 /// 558 /// # Returns 559 /// 560 /// If successful, `Ok(n)` is returned, where `n` is the number of bytes 561 /// sent. If the socket is not ready to send data, 562 /// `Err(ErrorKind::WouldBlock)` is returned. 563 /// 564 /// # Examples 565 /// 566 /// ```no_run 567 /// use tokio::net::UdpSocket; 568 /// use std::io; 569 /// 570 /// #[tokio::main] 571 /// async fn main() -> io::Result<()> { 572 /// // Bind a UDP socket 573 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 574 /// 575 /// // Connect to a peer 576 /// socket.connect("127.0.0.1:8081").await?; 577 /// 578 /// loop { 579 /// // Wait for the socket to be writable 580 /// socket.writable().await?; 581 /// 582 /// // Try to send data, this may still fail with `WouldBlock` 583 /// // if the readiness event is a false positive. 584 /// match socket.try_send(b"hello world") { 585 /// Ok(n) => { 586 /// break; 587 /// } 588 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 589 /// continue; 590 /// } 591 /// Err(e) => { 592 /// return Err(e); 593 /// } 594 /// } 595 /// } 596 /// 597 /// Ok(()) 598 /// } 599 /// ``` try_send(&self, buf: &[u8]) -> io::Result<usize>600 pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> { 601 self.io 602 .registration() 603 .try_io(Interest::WRITABLE, || self.io.send(buf)) 604 } 605 606 /// Waits for the socket to become readable. 607 /// 608 /// This function is equivalent to `ready(Interest::READABLE)` and is usually 609 /// paired with `try_recv()`. 610 /// 611 /// The function may complete without the socket being readable. This is a 612 /// false-positive and attempting a `try_recv()` will return with 613 /// `io::ErrorKind::WouldBlock`. 614 /// 615 /// # Cancel safety 616 /// 617 /// This method is cancel safe. Once a readiness event occurs, the method 618 /// will continue to return immediately until the readiness event is 619 /// consumed by an attempt to read that fails with `WouldBlock` or 620 /// `Poll::Pending`. 621 /// 622 /// # Examples 623 /// 624 /// ```no_run 625 /// use tokio::net::UdpSocket; 626 /// use std::io; 627 /// 628 /// #[tokio::main] 629 /// async fn main() -> io::Result<()> { 630 /// // Connect to a peer 631 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 632 /// socket.connect("127.0.0.1:8081").await?; 633 /// 634 /// loop { 635 /// // Wait for the socket to be readable 636 /// socket.readable().await?; 637 /// 638 /// // The buffer is **not** included in the async task and will 639 /// // only exist on the stack. 640 /// let mut buf = [0; 1024]; 641 /// 642 /// // Try to recv data, this may still fail with `WouldBlock` 643 /// // if the readiness event is a false positive. 644 /// match socket.try_recv(&mut buf) { 645 /// Ok(n) => { 646 /// println!("GOT {:?}", &buf[..n]); 647 /// break; 648 /// } 649 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 650 /// continue; 651 /// } 652 /// Err(e) => { 653 /// return Err(e); 654 /// } 655 /// } 656 /// } 657 /// 658 /// Ok(()) 659 /// } 660 /// ``` readable(&self) -> io::Result<()>661 pub async fn readable(&self) -> io::Result<()> { 662 self.ready(Interest::READABLE).await?; 663 Ok(()) 664 } 665 666 /// Polls for read/receive readiness. 667 /// 668 /// If the udp stream is not currently ready for receiving, this method will 669 /// store a clone of the `Waker` from the provided `Context`. When the udp 670 /// socket becomes ready for reading, `Waker::wake` will be called on the 671 /// waker. 672 /// 673 /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or 674 /// `poll_peek`, only the `Waker` from the `Context` passed to the most 675 /// recent call is scheduled to receive a wakeup. (However, 676 /// `poll_send_ready` retains a second, independent waker.) 677 /// 678 /// This function is intended for cases where creating and pinning a future 679 /// via [`readable`] is not feasible. Where possible, using [`readable`] is 680 /// preferred, as this supports polling from multiple tasks at once. 681 /// 682 /// # Return value 683 /// 684 /// The function returns: 685 /// 686 /// * `Poll::Pending` if the udp stream is not ready for reading. 687 /// * `Poll::Ready(Ok(()))` if the udp stream is ready for reading. 688 /// * `Poll::Ready(Err(e))` if an error is encountered. 689 /// 690 /// # Errors 691 /// 692 /// This function may encounter any standard I/O error except `WouldBlock`. 693 /// 694 /// [`readable`]: method@Self::readable poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>695 pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 696 self.io.registration().poll_read_ready(cx).map_ok(|_| ()) 697 } 698 699 /// Receives a single datagram message on the socket from the remote address 700 /// to which it is connected. On success, returns the number of bytes read. 701 /// 702 /// The function must be called with valid byte array `buf` of sufficient 703 /// size to hold the message bytes. If a message is too long to fit in the 704 /// supplied buffer, excess bytes may be discarded. 705 /// 706 /// The [`connect`] method will connect this socket to a remote address. 707 /// This method will fail if the socket is not connected. 708 /// 709 /// # Cancel safety 710 /// 711 /// This method is cancel safe. If `recv_from` is used as the event in a 712 /// [`tokio::select!`](crate::select) statement and some other branch 713 /// completes first, it is guaranteed that no messages were received on this 714 /// socket. 715 /// 716 /// [`connect`]: method@Self::connect 717 /// 718 /// ```no_run 719 /// use tokio::net::UdpSocket; 720 /// use std::io; 721 /// 722 /// #[tokio::main] 723 /// async fn main() -> io::Result<()> { 724 /// // Bind socket 725 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 726 /// socket.connect("127.0.0.1:8081").await?; 727 /// 728 /// let mut buf = vec![0; 10]; 729 /// let n = socket.recv(&mut buf).await?; 730 /// 731 /// println!("received {} bytes {:?}", n, &buf[..n]); 732 /// 733 /// Ok(()) 734 /// } 735 /// ``` recv(&self, buf: &mut [u8]) -> io::Result<usize>736 pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { 737 self.io 738 .registration() 739 .async_io(Interest::READABLE, || self.io.recv(buf)) 740 .await 741 } 742 743 /// Attempts to receive a single datagram message on the socket from the remote 744 /// address to which it is `connect`ed. 745 /// 746 /// The [`connect`] method will connect this socket to a remote address. This method 747 /// resolves to an error if the socket is not connected. 748 /// 749 /// Note that on multiple calls to a `poll_*` method in the recv direction, only the 750 /// `Waker` from the `Context` passed to the most recent call will be scheduled to 751 /// receive a wakeup. 752 /// 753 /// # Return value 754 /// 755 /// The function returns: 756 /// 757 /// * `Poll::Pending` if the socket is not ready to read 758 /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready 759 /// * `Poll::Ready(Err(e))` if an error is encountered. 760 /// 761 /// # Errors 762 /// 763 /// This function may encounter any standard I/O error except `WouldBlock`. 764 /// 765 /// [`connect`]: method@Self::connect poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>>766 pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> { 767 let n = ready!(self.io.registration().poll_read_io(cx, || { 768 // Safety: will not read the maybe uninitialized bytes. 769 let b = unsafe { 770 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) 771 }; 772 773 self.io.recv(b) 774 }))?; 775 776 // Safety: We trust `recv` to have filled up `n` bytes in the buffer. 777 unsafe { 778 buf.assume_init(n); 779 } 780 buf.advance(n); 781 Poll::Ready(Ok(())) 782 } 783 784 /// Tries to receive a single datagram message on the socket from the remote 785 /// address to which it is connected. On success, returns the number of 786 /// bytes read. 787 /// 788 /// The function must be called with valid byte array buf of sufficient size 789 /// to hold the message bytes. If a message is too long to fit in the 790 /// supplied buffer, excess bytes may be discarded. 791 /// 792 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is 793 /// returned. This function is usually paired with `readable()`. 794 /// 795 /// # Examples 796 /// 797 /// ```no_run 798 /// use tokio::net::UdpSocket; 799 /// use std::io; 800 /// 801 /// #[tokio::main] 802 /// async fn main() -> io::Result<()> { 803 /// // Connect to a peer 804 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 805 /// socket.connect("127.0.0.1:8081").await?; 806 /// 807 /// loop { 808 /// // Wait for the socket to be readable 809 /// socket.readable().await?; 810 /// 811 /// // The buffer is **not** included in the async task and will 812 /// // only exist on the stack. 813 /// let mut buf = [0; 1024]; 814 /// 815 /// // Try to recv data, this may still fail with `WouldBlock` 816 /// // if the readiness event is a false positive. 817 /// match socket.try_recv(&mut buf) { 818 /// Ok(n) => { 819 /// println!("GOT {:?}", &buf[..n]); 820 /// break; 821 /// } 822 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 823 /// continue; 824 /// } 825 /// Err(e) => { 826 /// return Err(e); 827 /// } 828 /// } 829 /// } 830 /// 831 /// Ok(()) 832 /// } 833 /// ``` try_recv(&self, buf: &mut [u8]) -> io::Result<usize>834 pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> { 835 self.io 836 .registration() 837 .try_io(Interest::READABLE, || self.io.recv(buf)) 838 } 839 840 cfg_io_util! { 841 /// Tries to receive data from the stream into the provided buffer, advancing the 842 /// buffer's internal cursor, returning how many bytes were read. 843 /// 844 /// The function must be called with valid byte array buf of sufficient size 845 /// to hold the message bytes. If a message is too long to fit in the 846 /// supplied buffer, excess bytes may be discarded. 847 /// 848 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is 849 /// returned. This function is usually paired with `readable()`. 850 /// 851 /// # Examples 852 /// 853 /// ```no_run 854 /// use tokio::net::UdpSocket; 855 /// use std::io; 856 /// 857 /// #[tokio::main] 858 /// async fn main() -> io::Result<()> { 859 /// // Connect to a peer 860 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 861 /// socket.connect("127.0.0.1:8081").await?; 862 /// 863 /// loop { 864 /// // Wait for the socket to be readable 865 /// socket.readable().await?; 866 /// 867 /// let mut buf = Vec::with_capacity(1024); 868 /// 869 /// // Try to recv data, this may still fail with `WouldBlock` 870 /// // if the readiness event is a false positive. 871 /// match socket.try_recv_buf(&mut buf) { 872 /// Ok(n) => { 873 /// println!("GOT {:?}", &buf[..n]); 874 /// break; 875 /// } 876 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 877 /// continue; 878 /// } 879 /// Err(e) => { 880 /// return Err(e); 881 /// } 882 /// } 883 /// } 884 /// 885 /// Ok(()) 886 /// } 887 /// ``` 888 pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { 889 self.io.registration().try_io(Interest::READABLE, || { 890 let dst = buf.chunk_mut(); 891 let dst = 892 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; 893 894 // Safety: We trust `UdpSocket::recv` to have filled up `n` bytes in the 895 // buffer. 896 let n = (&*self.io).recv(dst)?; 897 898 unsafe { 899 buf.advance_mut(n); 900 } 901 902 Ok(n) 903 }) 904 } 905 906 /// Tries to receive a single datagram message on the socket. On success, 907 /// returns the number of bytes read and the origin. 908 /// 909 /// The function must be called with valid byte array buf of sufficient size 910 /// to hold the message bytes. If a message is too long to fit in the 911 /// supplied buffer, excess bytes may be discarded. 912 /// 913 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is 914 /// returned. This function is usually paired with `readable()`. 915 /// 916 /// # Examples 917 /// 918 /// ```no_run 919 /// use tokio::net::UdpSocket; 920 /// use std::io; 921 /// 922 /// #[tokio::main] 923 /// async fn main() -> io::Result<()> { 924 /// // Connect to a peer 925 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 926 /// 927 /// loop { 928 /// // Wait for the socket to be readable 929 /// socket.readable().await?; 930 /// 931 /// let mut buf = Vec::with_capacity(1024); 932 /// 933 /// // Try to recv data, this may still fail with `WouldBlock` 934 /// // if the readiness event is a false positive. 935 /// match socket.try_recv_buf_from(&mut buf) { 936 /// Ok((n, _addr)) => { 937 /// println!("GOT {:?}", &buf[..n]); 938 /// break; 939 /// } 940 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 941 /// continue; 942 /// } 943 /// Err(e) => { 944 /// return Err(e); 945 /// } 946 /// } 947 /// } 948 /// 949 /// Ok(()) 950 /// } 951 /// ``` 952 pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> { 953 self.io.registration().try_io(Interest::READABLE, || { 954 let dst = buf.chunk_mut(); 955 let dst = 956 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; 957 958 // Safety: We trust `UdpSocket::recv_from` to have filled up `n` bytes in the 959 // buffer. 960 let (n, addr) = (&*self.io).recv_from(dst)?; 961 962 unsafe { 963 buf.advance_mut(n); 964 } 965 966 Ok((n, addr)) 967 }) 968 } 969 } 970 971 /// Sends data on the socket to the given address. On success, returns the 972 /// number of bytes written. 973 /// 974 /// Address type can be any implementor of [`ToSocketAddrs`] trait. See its 975 /// documentation for concrete examples. 976 /// 977 /// It is possible for `addr` to yield multiple addresses, but `send_to` 978 /// will only send data to the first address yielded by `addr`. 979 /// 980 /// This will return an error when the IP version of the local socket does 981 /// not match that returned from [`ToSocketAddrs`]. 982 /// 983 /// [`ToSocketAddrs`]: crate::net::ToSocketAddrs 984 /// 985 /// # Cancel safety 986 /// 987 /// This method is cancel safe. If `send_to` is used as the event in a 988 /// [`tokio::select!`](crate::select) statement and some other branch 989 /// completes first, then it is guaranteed that the message was not sent. 990 /// 991 /// # Example 992 /// 993 /// ```no_run 994 /// use tokio::net::UdpSocket; 995 /// use std::io; 996 /// 997 /// #[tokio::main] 998 /// async fn main() -> io::Result<()> { 999 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 1000 /// let len = socket.send_to(b"hello world", "127.0.0.1:8081").await?; 1001 /// 1002 /// println!("Sent {} bytes", len); 1003 /// 1004 /// Ok(()) 1005 /// } 1006 /// ``` send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize>1007 pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize> { 1008 let mut addrs = to_socket_addrs(target).await?; 1009 1010 match addrs.next() { 1011 Some(target) => self.send_to_addr(buf, target).await, 1012 None => Err(io::Error::new( 1013 io::ErrorKind::InvalidInput, 1014 "no addresses to send data to", 1015 )), 1016 } 1017 } 1018 1019 /// Attempts to send data on the socket to a given address. 1020 /// 1021 /// Note that on multiple calls to a `poll_*` method in the send direction, only the 1022 /// `Waker` from the `Context` passed to the most recent call will be scheduled to 1023 /// receive a wakeup. 1024 /// 1025 /// # Return value 1026 /// 1027 /// The function returns: 1028 /// 1029 /// * `Poll::Pending` if the socket is not ready to write 1030 /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent. 1031 /// * `Poll::Ready(Err(e))` if an error is encountered. 1032 /// 1033 /// # Errors 1034 /// 1035 /// This function may encounter any standard I/O error except `WouldBlock`. poll_send_to( &self, cx: &mut Context<'_>, buf: &[u8], target: SocketAddr, ) -> Poll<io::Result<usize>>1036 pub fn poll_send_to( 1037 &self, 1038 cx: &mut Context<'_>, 1039 buf: &[u8], 1040 target: SocketAddr, 1041 ) -> Poll<io::Result<usize>> { 1042 self.io 1043 .registration() 1044 .poll_write_io(cx, || self.io.send_to(buf, target)) 1045 } 1046 1047 /// Tries to send data on the socket to the given address, but if the send is 1048 /// blocked this will return right away. 1049 /// 1050 /// This function is usually paired with `writable()`. 1051 /// 1052 /// # Returns 1053 /// 1054 /// If successful, returns the number of bytes sent 1055 /// 1056 /// Users should ensure that when the remote cannot receive, the 1057 /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur 1058 /// if the IP version of the socket does not match that of `target`. 1059 /// 1060 /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock 1061 /// 1062 /// # Example 1063 /// 1064 /// ```no_run 1065 /// use tokio::net::UdpSocket; 1066 /// use std::error::Error; 1067 /// use std::io; 1068 /// 1069 /// #[tokio::main] 1070 /// async fn main() -> Result<(), Box<dyn Error>> { 1071 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 1072 /// 1073 /// let dst = "127.0.0.1:8081".parse()?; 1074 /// 1075 /// loop { 1076 /// socket.writable().await?; 1077 /// 1078 /// match socket.try_send_to(&b"hello world"[..], dst) { 1079 /// Ok(sent) => { 1080 /// println!("sent {} bytes", sent); 1081 /// break; 1082 /// } 1083 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 1084 /// // Writable false positive. 1085 /// continue; 1086 /// } 1087 /// Err(e) => return Err(e.into()), 1088 /// } 1089 /// } 1090 /// 1091 /// Ok(()) 1092 /// } 1093 /// ``` try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize>1094 pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> { 1095 self.io 1096 .registration() 1097 .try_io(Interest::WRITABLE, || self.io.send_to(buf, target)) 1098 } 1099 send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize>1100 async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> { 1101 self.io 1102 .registration() 1103 .async_io(Interest::WRITABLE, || self.io.send_to(buf, target)) 1104 .await 1105 } 1106 1107 /// Receives a single datagram message on the socket. On success, returns 1108 /// the number of bytes read and the origin. 1109 /// 1110 /// The function must be called with valid byte array `buf` of sufficient 1111 /// size to hold the message bytes. If a message is too long to fit in the 1112 /// supplied buffer, excess bytes may be discarded. 1113 /// 1114 /// # Cancel safety 1115 /// 1116 /// This method is cancel safe. If `recv_from` is used as the event in a 1117 /// [`tokio::select!`](crate::select) statement and some other branch 1118 /// completes first, it is guaranteed that no messages were received on this 1119 /// socket. 1120 /// 1121 /// # Example 1122 /// 1123 /// ```no_run 1124 /// use tokio::net::UdpSocket; 1125 /// use std::io; 1126 /// 1127 /// #[tokio::main] 1128 /// async fn main() -> io::Result<()> { 1129 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 1130 /// 1131 /// let mut buf = vec![0u8; 32]; 1132 /// let (len, addr) = socket.recv_from(&mut buf).await?; 1133 /// 1134 /// println!("received {:?} bytes from {:?}", len, addr); 1135 /// 1136 /// Ok(()) 1137 /// } 1138 /// ``` recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1139 pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { 1140 self.io 1141 .registration() 1142 .async_io(Interest::READABLE, || self.io.recv_from(buf)) 1143 .await 1144 } 1145 1146 /// Attempts to receive a single datagram on the socket. 1147 /// 1148 /// Note that on multiple calls to a `poll_*` method in the recv direction, only the 1149 /// `Waker` from the `Context` passed to the most recent call will be scheduled to 1150 /// receive a wakeup. 1151 /// 1152 /// # Return value 1153 /// 1154 /// The function returns: 1155 /// 1156 /// * `Poll::Pending` if the socket is not ready to read 1157 /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready 1158 /// * `Poll::Ready(Err(e))` if an error is encountered. 1159 /// 1160 /// # Errors 1161 /// 1162 /// This function may encounter any standard I/O error except `WouldBlock`. poll_recv_from( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<SocketAddr>>1163 pub fn poll_recv_from( 1164 &self, 1165 cx: &mut Context<'_>, 1166 buf: &mut ReadBuf<'_>, 1167 ) -> Poll<io::Result<SocketAddr>> { 1168 let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { 1169 // Safety: will not read the maybe uninitialized bytes. 1170 let b = unsafe { 1171 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) 1172 }; 1173 1174 self.io.recv_from(b) 1175 }))?; 1176 1177 // Safety: We trust `recv` to have filled up `n` bytes in the buffer. 1178 unsafe { 1179 buf.assume_init(n); 1180 } 1181 buf.advance(n); 1182 Poll::Ready(Ok(addr)) 1183 } 1184 1185 /// Tries to receive a single datagram message on the socket. On success, 1186 /// returns the number of bytes read and the origin. 1187 /// 1188 /// The function must be called with valid byte array buf of sufficient size 1189 /// to hold the message bytes. If a message is too long to fit in the 1190 /// supplied buffer, excess bytes may be discarded. 1191 /// 1192 /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is 1193 /// returned. This function is usually paired with `readable()`. 1194 /// 1195 /// # Examples 1196 /// 1197 /// ```no_run 1198 /// use tokio::net::UdpSocket; 1199 /// use std::io; 1200 /// 1201 /// #[tokio::main] 1202 /// async fn main() -> io::Result<()> { 1203 /// // Connect to a peer 1204 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 1205 /// 1206 /// loop { 1207 /// // Wait for the socket to be readable 1208 /// socket.readable().await?; 1209 /// 1210 /// // The buffer is **not** included in the async task and will 1211 /// // only exist on the stack. 1212 /// let mut buf = [0; 1024]; 1213 /// 1214 /// // Try to recv data, this may still fail with `WouldBlock` 1215 /// // if the readiness event is a false positive. 1216 /// match socket.try_recv_from(&mut buf) { 1217 /// Ok((n, _addr)) => { 1218 /// println!("GOT {:?}", &buf[..n]); 1219 /// break; 1220 /// } 1221 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 1222 /// continue; 1223 /// } 1224 /// Err(e) => { 1225 /// return Err(e); 1226 /// } 1227 /// } 1228 /// } 1229 /// 1230 /// Ok(()) 1231 /// } 1232 /// ``` try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1233 pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { 1234 self.io 1235 .registration() 1236 .try_io(Interest::READABLE, || self.io.recv_from(buf)) 1237 } 1238 1239 /// Tries to read or write from the socket using a user-provided IO operation. 1240 /// 1241 /// If the socket is ready, the provided closure is called. The closure 1242 /// should attempt to perform IO operation from the socket by manually 1243 /// calling the appropriate syscall. If the operation fails because the 1244 /// socket is not actually ready, then the closure should return a 1245 /// `WouldBlock` error and the readiness flag is cleared. The return value 1246 /// of the closure is then returned by `try_io`. 1247 /// 1248 /// If the socket is not ready, then the closure is not called 1249 /// and a `WouldBlock` error is returned. 1250 /// 1251 /// The closure should only return a `WouldBlock` error if it has performed 1252 /// an IO operation on the socket that failed due to the socket not being 1253 /// ready. Returning a `WouldBlock` error in any other situation will 1254 /// incorrectly clear the readiness flag, which can cause the socket to 1255 /// behave incorrectly. 1256 /// 1257 /// The closure should not perform the IO operation using any of the methods 1258 /// defined on the Tokio `UdpSocket` type, as this will mess with the 1259 /// readiness flag and can cause the socket to behave incorrectly. 1260 /// 1261 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. 1262 /// 1263 /// [`readable()`]: UdpSocket::readable() 1264 /// [`writable()`]: UdpSocket::writable() 1265 /// [`ready()`]: UdpSocket::ready() try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1266 pub fn try_io<R>( 1267 &self, 1268 interest: Interest, 1269 f: impl FnOnce() -> io::Result<R>, 1270 ) -> io::Result<R> { 1271 self.io.registration().try_io(interest, f) 1272 } 1273 1274 /// Receives data from the socket, without removing it from the input queue. 1275 /// On success, returns the number of bytes read and the address from whence 1276 /// the data came. 1277 /// 1278 /// # Notes 1279 /// 1280 /// On Windows, if the data is larger than the buffer specified, the buffer 1281 /// is filled with the first part of the data, and peek_from returns the error 1282 /// WSAEMSGSIZE(10040). The excess data is lost. 1283 /// Make sure to always use a sufficiently large buffer to hold the 1284 /// maximum UDP packet size, which can be up to 65536 bytes in size. 1285 /// 1286 /// # Examples 1287 /// 1288 /// ```no_run 1289 /// use tokio::net::UdpSocket; 1290 /// use std::io; 1291 /// 1292 /// #[tokio::main] 1293 /// async fn main() -> io::Result<()> { 1294 /// let socket = UdpSocket::bind("127.0.0.1:8080").await?; 1295 /// 1296 /// let mut buf = vec![0u8; 32]; 1297 /// let (len, addr) = socket.peek_from(&mut buf).await?; 1298 /// 1299 /// println!("peeked {:?} bytes from {:?}", len, addr); 1300 /// 1301 /// Ok(()) 1302 /// } 1303 /// ``` peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1304 pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { 1305 self.io 1306 .registration() 1307 .async_io(Interest::READABLE, || self.io.peek_from(buf)) 1308 .await 1309 } 1310 1311 /// Receives data from the socket, without removing it from the input queue. 1312 /// On success, returns the number of bytes read. 1313 /// 1314 /// # Notes 1315 /// 1316 /// Note that on multiple calls to a `poll_*` method in the recv direction, only the 1317 /// `Waker` from the `Context` passed to the most recent call will be scheduled to 1318 /// receive a wakeup 1319 /// 1320 /// On Windows, if the data is larger than the buffer specified, the buffer 1321 /// is filled with the first part of the data, and peek returns the error 1322 /// WSAEMSGSIZE(10040). The excess data is lost. 1323 /// Make sure to always use a sufficiently large buffer to hold the 1324 /// maximum UDP packet size, which can be up to 65536 bytes in size. 1325 /// 1326 /// # Return value 1327 /// 1328 /// The function returns: 1329 /// 1330 /// * `Poll::Pending` if the socket is not ready to read 1331 /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready 1332 /// * `Poll::Ready(Err(e))` if an error is encountered. 1333 /// 1334 /// # Errors 1335 /// 1336 /// This function may encounter any standard I/O error except `WouldBlock`. poll_peek_from( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<SocketAddr>>1337 pub fn poll_peek_from( 1338 &self, 1339 cx: &mut Context<'_>, 1340 buf: &mut ReadBuf<'_>, 1341 ) -> Poll<io::Result<SocketAddr>> { 1342 let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || { 1343 // Safety: will not read the maybe uninitialized bytes. 1344 let b = unsafe { 1345 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) 1346 }; 1347 1348 self.io.peek_from(b) 1349 }))?; 1350 1351 // Safety: We trust `recv` to have filled up `n` bytes in the buffer. 1352 unsafe { 1353 buf.assume_init(n); 1354 } 1355 buf.advance(n); 1356 Poll::Ready(Ok(addr)) 1357 } 1358 1359 /// Gets the value of the `SO_BROADCAST` option for this socket. 1360 /// 1361 /// For more information about this option, see [`set_broadcast`]. 1362 /// 1363 /// [`set_broadcast`]: method@Self::set_broadcast broadcast(&self) -> io::Result<bool>1364 pub fn broadcast(&self) -> io::Result<bool> { 1365 self.io.broadcast() 1366 } 1367 1368 /// Sets the value of the `SO_BROADCAST` option for this socket. 1369 /// 1370 /// When enabled, this socket is allowed to send packets to a broadcast 1371 /// address. set_broadcast(&self, on: bool) -> io::Result<()>1372 pub fn set_broadcast(&self, on: bool) -> io::Result<()> { 1373 self.io.set_broadcast(on) 1374 } 1375 1376 /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket. 1377 /// 1378 /// For more information about this option, see [`set_multicast_loop_v4`]. 1379 /// 1380 /// [`set_multicast_loop_v4`]: method@Self::set_multicast_loop_v4 multicast_loop_v4(&self) -> io::Result<bool>1381 pub fn multicast_loop_v4(&self) -> io::Result<bool> { 1382 self.io.multicast_loop_v4() 1383 } 1384 1385 /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket. 1386 /// 1387 /// If enabled, multicast packets will be looped back to the local socket. 1388 /// 1389 /// # Note 1390 /// 1391 /// This may not have any affect on IPv6 sockets. set_multicast_loop_v4(&self, on: bool) -> io::Result<()>1392 pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> { 1393 self.io.set_multicast_loop_v4(on) 1394 } 1395 1396 /// Gets the value of the `IP_MULTICAST_TTL` option for this socket. 1397 /// 1398 /// For more information about this option, see [`set_multicast_ttl_v4`]. 1399 /// 1400 /// [`set_multicast_ttl_v4`]: method@Self::set_multicast_ttl_v4 multicast_ttl_v4(&self) -> io::Result<u32>1401 pub fn multicast_ttl_v4(&self) -> io::Result<u32> { 1402 self.io.multicast_ttl_v4() 1403 } 1404 1405 /// Sets the value of the `IP_MULTICAST_TTL` option for this socket. 1406 /// 1407 /// Indicates the time-to-live value of outgoing multicast packets for 1408 /// this socket. The default value is 1 which means that multicast packets 1409 /// don't leave the local network unless explicitly requested. 1410 /// 1411 /// # Note 1412 /// 1413 /// This may not have any affect on IPv6 sockets. set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()>1414 pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> { 1415 self.io.set_multicast_ttl_v4(ttl) 1416 } 1417 1418 /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket. 1419 /// 1420 /// For more information about this option, see [`set_multicast_loop_v6`]. 1421 /// 1422 /// [`set_multicast_loop_v6`]: method@Self::set_multicast_loop_v6 multicast_loop_v6(&self) -> io::Result<bool>1423 pub fn multicast_loop_v6(&self) -> io::Result<bool> { 1424 self.io.multicast_loop_v6() 1425 } 1426 1427 /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket. 1428 /// 1429 /// Controls whether this socket sees the multicast packets it sends itself. 1430 /// 1431 /// # Note 1432 /// 1433 /// This may not have any affect on IPv4 sockets. set_multicast_loop_v6(&self, on: bool) -> io::Result<()>1434 pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> { 1435 self.io.set_multicast_loop_v6(on) 1436 } 1437 1438 /// Gets the value of the `IP_TTL` option for this socket. 1439 /// 1440 /// For more information about this option, see [`set_ttl`]. 1441 /// 1442 /// [`set_ttl`]: method@Self::set_ttl 1443 /// 1444 /// # Examples 1445 /// 1446 /// ```no_run 1447 /// use tokio::net::UdpSocket; 1448 /// # use std::io; 1449 /// 1450 /// # async fn dox() -> io::Result<()> { 1451 /// let sock = UdpSocket::bind("127.0.0.1:8080").await?; 1452 /// 1453 /// println!("{:?}", sock.ttl()?); 1454 /// # Ok(()) 1455 /// # } 1456 /// ``` ttl(&self) -> io::Result<u32>1457 pub fn ttl(&self) -> io::Result<u32> { 1458 self.io.ttl() 1459 } 1460 1461 /// Sets the value for the `IP_TTL` option on this socket. 1462 /// 1463 /// This value sets the time-to-live field that is used in every packet sent 1464 /// from this socket. 1465 /// 1466 /// # Examples 1467 /// 1468 /// ```no_run 1469 /// use tokio::net::UdpSocket; 1470 /// # use std::io; 1471 /// 1472 /// # async fn dox() -> io::Result<()> { 1473 /// let sock = UdpSocket::bind("127.0.0.1:8080").await?; 1474 /// sock.set_ttl(60)?; 1475 /// 1476 /// # Ok(()) 1477 /// # } 1478 /// ``` set_ttl(&self, ttl: u32) -> io::Result<()>1479 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { 1480 self.io.set_ttl(ttl) 1481 } 1482 1483 /// Executes an operation of the `IP_ADD_MEMBERSHIP` type. 1484 /// 1485 /// This function specifies a new multicast group for this socket to join. 1486 /// The address must be a valid multicast address, and `interface` is the 1487 /// address of the local interface with which the system should join the 1488 /// multicast group. If it's equal to `INADDR_ANY` then an appropriate 1489 /// interface is chosen by the system. join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()>1490 pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { 1491 self.io.join_multicast_v4(&multiaddr, &interface) 1492 } 1493 1494 /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type. 1495 /// 1496 /// This function specifies a new multicast group for this socket to join. 1497 /// The address must be a valid multicast address, and `interface` is the 1498 /// index of the interface to join/leave (or 0 to indicate any interface). join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()>1499 pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { 1500 self.io.join_multicast_v6(multiaddr, interface) 1501 } 1502 1503 /// Executes an operation of the `IP_DROP_MEMBERSHIP` type. 1504 /// 1505 /// For more information about this option, see [`join_multicast_v4`]. 1506 /// 1507 /// [`join_multicast_v4`]: method@Self::join_multicast_v4 leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()>1508 pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> { 1509 self.io.leave_multicast_v4(&multiaddr, &interface) 1510 } 1511 1512 /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type. 1513 /// 1514 /// For more information about this option, see [`join_multicast_v6`]. 1515 /// 1516 /// [`join_multicast_v6`]: method@Self::join_multicast_v6 leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()>1517 pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> { 1518 self.io.leave_multicast_v6(multiaddr, interface) 1519 } 1520 1521 /// Returns the value of the `SO_ERROR` option. 1522 /// 1523 /// # Examples 1524 /// ``` 1525 /// use tokio::net::UdpSocket; 1526 /// use std::io; 1527 /// 1528 /// #[tokio::main] 1529 /// async fn main() -> io::Result<()> { 1530 /// // Create a socket 1531 /// let socket = UdpSocket::bind("0.0.0.0:8080").await?; 1532 /// 1533 /// if let Ok(Some(err)) = socket.take_error() { 1534 /// println!("Got error: {:?}", err); 1535 /// } 1536 /// 1537 /// Ok(()) 1538 /// } 1539 /// ``` take_error(&self) -> io::Result<Option<io::Error>>1540 pub fn take_error(&self) -> io::Result<Option<io::Error>> { 1541 self.io.take_error() 1542 } 1543 } 1544 1545 impl TryFrom<std::net::UdpSocket> for UdpSocket { 1546 type Error = io::Error; 1547 1548 /// Consumes stream, returning the tokio I/O object. 1549 /// 1550 /// This is equivalent to 1551 /// [`UdpSocket::from_std(stream)`](UdpSocket::from_std). try_from(stream: std::net::UdpSocket) -> Result<Self, Self::Error>1552 fn try_from(stream: std::net::UdpSocket) -> Result<Self, Self::Error> { 1553 Self::from_std(stream) 1554 } 1555 } 1556 1557 impl fmt::Debug for UdpSocket { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1558 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1559 self.io.fmt(f) 1560 } 1561 } 1562 1563 #[cfg(all(unix))] 1564 mod sys { 1565 use super::UdpSocket; 1566 use std::os::unix::prelude::*; 1567 1568 impl AsRawFd for UdpSocket { as_raw_fd(&self) -> RawFd1569 fn as_raw_fd(&self) -> RawFd { 1570 self.io.as_raw_fd() 1571 } 1572 } 1573 } 1574 1575 #[cfg(windows)] 1576 mod sys { 1577 use super::UdpSocket; 1578 use std::os::windows::prelude::*; 1579 1580 impl AsRawSocket for UdpSocket { as_raw_socket(&self) -> RawSocket1581 fn as_raw_socket(&self) -> RawSocket { 1582 self.io.as_raw_socket() 1583 } 1584 } 1585 } 1586