• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::io::{Interest, PollEvented, ReadBuf, Ready};
2 use crate::net::{to_socket_addrs, ToSocketAddrs};
3 
4 use std::convert::TryFrom;
5 use std::fmt;
6 use std::io;
7 use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr};
8 use std::task::{Context, Poll};
9 
10 cfg_io_util! {
11     use bytes::BufMut;
12 }
13 
14 cfg_net! {
15     /// A UDP socket.
16     ///
17     /// UDP is "connectionless", unlike TCP. Meaning, regardless of what address you've bound to, a `UdpSocket`
18     /// is free to communicate with many different remotes. In tokio there are basically two main ways to use `UdpSocket`:
19     ///
20     /// * one to many: [`bind`](`UdpSocket::bind`) and use [`send_to`](`UdpSocket::send_to`)
21     ///   and [`recv_from`](`UdpSocket::recv_from`) to communicate with many different addresses
22     /// * one to one: [`connect`](`UdpSocket::connect`) and associate with a single address, using [`send`](`UdpSocket::send`)
23     ///   and [`recv`](`UdpSocket::recv`) to communicate only with that remote address
24     ///
25     /// This type does not provide a `split` method, because this functionality
26     /// can be achieved by instead wrapping the socket in an [`Arc`]. Note that
27     /// you do not need a `Mutex` to share the `UdpSocket` — an `Arc<UdpSocket>`
28     /// is enough. This is because all of the methods take `&self` instead of
29     /// `&mut self`. Once you have wrapped it in an `Arc`, you can call
30     /// `.clone()` on the `Arc<UdpSocket>` to get multiple shared handles to the
31     /// same socket. An example of such usage can be found further down.
32     ///
33     /// [`Arc`]: std::sync::Arc
34     ///
35     /// # Streams
36     ///
37     /// If you need to listen over UDP and produce a [`Stream`], you can look
38     /// at [`UdpFramed`].
39     ///
40     /// [`UdpFramed`]: https://docs.rs/tokio-util/latest/tokio_util/udp/struct.UdpFramed.html
41     /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
42     ///
43     /// # Example: one to many (bind)
44     ///
45     /// Using `bind` we can create a simple echo server that sends and recv's with many different clients:
46     /// ```no_run
47     /// use tokio::net::UdpSocket;
48     /// use std::io;
49     ///
50     /// #[tokio::main]
51     /// async fn main() -> io::Result<()> {
52     ///     let sock = UdpSocket::bind("0.0.0.0:8080").await?;
53     ///     let mut buf = [0; 1024];
54     ///     loop {
55     ///         let (len, addr) = sock.recv_from(&mut buf).await?;
56     ///         println!("{:?} bytes received from {:?}", len, addr);
57     ///
58     ///         let len = sock.send_to(&buf[..len], addr).await?;
59     ///         println!("{:?} bytes sent", len);
60     ///     }
61     /// }
62     /// ```
63     ///
64     /// # Example: one to one (connect)
65     ///
66     /// Or using `connect` we can echo with a single remote address using `send` and `recv`:
67     /// ```no_run
68     /// use tokio::net::UdpSocket;
69     /// use std::io;
70     ///
71     /// #[tokio::main]
72     /// async fn main() -> io::Result<()> {
73     ///     let sock = UdpSocket::bind("0.0.0.0:8080").await?;
74     ///
75     ///     let remote_addr = "127.0.0.1:59611";
76     ///     sock.connect(remote_addr).await?;
77     ///     let mut buf = [0; 1024];
78     ///     loop {
79     ///         let len = sock.recv(&mut buf).await?;
80     ///         println!("{:?} bytes received from {:?}", len, remote_addr);
81     ///
82     ///         let len = sock.send(&buf[..len]).await?;
83     ///         println!("{:?} bytes sent", len);
84     ///     }
85     /// }
86     /// ```
87     ///
88     /// # Example: Splitting with `Arc`
89     ///
90     /// Because `send_to` and `recv_from` take `&self`. It's perfectly alright
91     /// to use an `Arc<UdpSocket>` and share the references to multiple tasks.
92     /// Here is a similar "echo" example that supports concurrent
93     /// sending/receiving:
94     ///
95     /// ```no_run
96     /// use tokio::{net::UdpSocket, sync::mpsc};
97     /// use std::{io, net::SocketAddr, sync::Arc};
98     ///
99     /// #[tokio::main]
100     /// async fn main() -> io::Result<()> {
101     ///     let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
102     ///     let r = Arc::new(sock);
103     ///     let s = r.clone();
104     ///     let (tx, mut rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(1_000);
105     ///
106     ///     tokio::spawn(async move {
107     ///         while let Some((bytes, addr)) = rx.recv().await {
108     ///             let len = s.send_to(&bytes, &addr).await.unwrap();
109     ///             println!("{:?} bytes sent", len);
110     ///         }
111     ///     });
112     ///
113     ///     let mut buf = [0; 1024];
114     ///     loop {
115     ///         let (len, addr) = r.recv_from(&mut buf).await?;
116     ///         println!("{:?} bytes received from {:?}", len, addr);
117     ///         tx.send((buf[..len].to_vec(), addr)).await.unwrap();
118     ///     }
119     /// }
120     /// ```
121     ///
122     pub struct UdpSocket {
123         io: PollEvented<mio::net::UdpSocket>,
124     }
125 }
126 
127 impl UdpSocket {
128     /// This function will create a new UDP socket and attempt to bind it to
129     /// the `addr` provided.
130     ///
131     /// # Example
132     ///
133     /// ```no_run
134     /// use tokio::net::UdpSocket;
135     /// use std::io;
136     ///
137     /// #[tokio::main]
138     /// async fn main() -> io::Result<()> {
139     ///     let sock = UdpSocket::bind("0.0.0.0:8080").await?;
140     ///     // use `sock`
141     /// #   let _ = sock;
142     ///     Ok(())
143     /// }
144     /// ```
bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket>145     pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<UdpSocket> {
146         let addrs = to_socket_addrs(addr).await?;
147         let mut last_err = None;
148 
149         for addr in addrs {
150             match UdpSocket::bind_addr(addr) {
151                 Ok(socket) => return Ok(socket),
152                 Err(e) => last_err = Some(e),
153             }
154         }
155 
156         Err(last_err.unwrap_or_else(|| {
157             io::Error::new(
158                 io::ErrorKind::InvalidInput,
159                 "could not resolve to any address",
160             )
161         }))
162     }
163 
bind_addr(addr: SocketAddr) -> io::Result<UdpSocket>164     fn bind_addr(addr: SocketAddr) -> io::Result<UdpSocket> {
165         let sys = mio::net::UdpSocket::bind(addr)?;
166         UdpSocket::new(sys)
167     }
168 
new(socket: mio::net::UdpSocket) -> io::Result<UdpSocket>169     fn new(socket: mio::net::UdpSocket) -> io::Result<UdpSocket> {
170         let io = PollEvented::new(socket)?;
171         Ok(UdpSocket { io })
172     }
173 
174     /// Creates new `UdpSocket` from a previously bound `std::net::UdpSocket`.
175     ///
176     /// This function is intended to be used to wrap a UDP socket from the
177     /// standard library in the Tokio equivalent. The conversion assumes nothing
178     /// about the underlying socket; it is left up to the user to set it in
179     /// non-blocking mode.
180     ///
181     /// This can be used in conjunction with socket2's `Socket` interface to
182     /// configure a socket before it's handed off, such as setting options like
183     /// `reuse_address` or binding to multiple addresses.
184     ///
185     /// # Panics
186     ///
187     /// This function panics if thread-local runtime is not set.
188     ///
189     /// The runtime is usually set implicitly when this function is called
190     /// from a future driven by a tokio runtime, otherwise runtime can be set
191     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
192     ///
193     /// # Example
194     ///
195     /// ```no_run
196     /// use tokio::net::UdpSocket;
197     /// # use std::{io, net::SocketAddr};
198     ///
199     /// # #[tokio::main]
200     /// # async fn main() -> io::Result<()> {
201     /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap();
202     /// let std_sock = std::net::UdpSocket::bind(addr)?;
203     /// std_sock.set_nonblocking(true)?;
204     /// let sock = UdpSocket::from_std(std_sock)?;
205     /// // use `sock`
206     /// # Ok(())
207     /// # }
208     /// ```
from_std(socket: net::UdpSocket) -> io::Result<UdpSocket>209     pub fn from_std(socket: net::UdpSocket) -> io::Result<UdpSocket> {
210         let io = mio::net::UdpSocket::from_std(socket);
211         UdpSocket::new(io)
212     }
213 
214     /// Turns a [`tokio::net::UdpSocket`] into a [`std::net::UdpSocket`].
215     ///
216     /// The returned [`std::net::UdpSocket`] will have nonblocking mode set as
217     /// `true`.  Use [`set_nonblocking`] to change the blocking mode if needed.
218     ///
219     /// # Examples
220     ///
221     /// ```rust,no_run
222     /// use std::error::Error;
223     ///
224     /// #[tokio::main]
225     /// async fn main() -> Result<(), Box<dyn Error>> {
226     ///     let tokio_socket = tokio::net::UdpSocket::bind("127.0.0.1:0").await?;
227     ///     let std_socket = tokio_socket.into_std()?;
228     ///     std_socket.set_nonblocking(false)?;
229     ///     Ok(())
230     /// }
231     /// ```
232     ///
233     /// [`tokio::net::UdpSocket`]: UdpSocket
234     /// [`std::net::UdpSocket`]: std::net::UdpSocket
235     /// [`set_nonblocking`]: fn@std::net::UdpSocket::set_nonblocking
into_std(self) -> io::Result<std::net::UdpSocket>236     pub fn into_std(self) -> io::Result<std::net::UdpSocket> {
237         #[cfg(unix)]
238         {
239             use std::os::unix::io::{FromRawFd, IntoRawFd};
240             self.io
241                 .into_inner()
242                 .map(|io| io.into_raw_fd())
243                 .map(|raw_fd| unsafe { std::net::UdpSocket::from_raw_fd(raw_fd) })
244         }
245 
246         #[cfg(windows)]
247         {
248             use std::os::windows::io::{FromRawSocket, IntoRawSocket};
249             self.io
250                 .into_inner()
251                 .map(|io| io.into_raw_socket())
252                 .map(|raw_socket| unsafe { std::net::UdpSocket::from_raw_socket(raw_socket) })
253         }
254     }
255 
256     /// Returns the local address that this socket is bound to.
257     ///
258     /// # Example
259     ///
260     /// ```no_run
261     /// use tokio::net::UdpSocket;
262     /// # use std::{io, net::SocketAddr};
263     ///
264     /// # #[tokio::main]
265     /// # async fn main() -> io::Result<()> {
266     /// let addr = "0.0.0.0:8080".parse::<SocketAddr>().unwrap();
267     /// let sock = UdpSocket::bind(addr).await?;
268     /// // the address the socket is bound to
269     /// let local_addr = sock.local_addr()?;
270     /// # Ok(())
271     /// # }
272     /// ```
local_addr(&self) -> io::Result<SocketAddr>273     pub fn local_addr(&self) -> io::Result<SocketAddr> {
274         self.io.local_addr()
275     }
276 
277     /// Connects the UDP socket setting the default destination for send() and
278     /// limiting packets that are read via recv from the address specified in
279     /// `addr`.
280     ///
281     /// # Example
282     ///
283     /// ```no_run
284     /// use tokio::net::UdpSocket;
285     /// # use std::{io, net::SocketAddr};
286     ///
287     /// # #[tokio::main]
288     /// # async fn main() -> io::Result<()> {
289     /// let sock = UdpSocket::bind("0.0.0.0:8080".parse::<SocketAddr>().unwrap()).await?;
290     ///
291     /// let remote_addr = "127.0.0.1:59600".parse::<SocketAddr>().unwrap();
292     /// sock.connect(remote_addr).await?;
293     /// let mut buf = [0u8; 32];
294     /// // recv from remote_addr
295     /// let len = sock.recv(&mut buf).await?;
296     /// // send to remote_addr
297     /// let _len = sock.send(&buf[..len]).await?;
298     /// # Ok(())
299     /// # }
300     /// ```
connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<()>301     pub async fn connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<()> {
302         let addrs = to_socket_addrs(addr).await?;
303         let mut last_err = None;
304 
305         for addr in addrs {
306             match self.io.connect(addr) {
307                 Ok(_) => return Ok(()),
308                 Err(e) => last_err = Some(e),
309             }
310         }
311 
312         Err(last_err.unwrap_or_else(|| {
313             io::Error::new(
314                 io::ErrorKind::InvalidInput,
315                 "could not resolve to any address",
316             )
317         }))
318     }
319 
320     /// Waits for any of the requested ready states.
321     ///
322     /// This function is usually paired with `try_recv()` or `try_send()`. It
323     /// can be used to concurrently recv / send to the same socket on a single
324     /// task without splitting the socket.
325     ///
326     /// The function may complete without the socket being ready. This is a
327     /// false-positive and attempting an operation will return with
328     /// `io::ErrorKind::WouldBlock`.
329     ///
330     /// # Cancel safety
331     ///
332     /// This method is cancel safe. Once a readiness event occurs, the method
333     /// will continue to return immediately until the readiness event is
334     /// consumed by an attempt to read or write that fails with `WouldBlock` or
335     /// `Poll::Pending`.
336     ///
337     /// # Examples
338     ///
339     /// Concurrently receive from and send to the socket on the same task
340     /// without splitting.
341     ///
342     /// ```no_run
343     /// use tokio::io::{self, Interest};
344     /// use tokio::net::UdpSocket;
345     ///
346     /// #[tokio::main]
347     /// async fn main() -> io::Result<()> {
348     ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
349     ///     socket.connect("127.0.0.1:8081").await?;
350     ///
351     ///     loop {
352     ///         let ready = socket.ready(Interest::READABLE | Interest::WRITABLE).await?;
353     ///
354     ///         if ready.is_readable() {
355     ///             // The buffer is **not** included in the async task and will only exist
356     ///             // on the stack.
357     ///             let mut data = [0; 1024];
358     ///             match socket.try_recv(&mut data[..]) {
359     ///                 Ok(n) => {
360     ///                     println!("received {:?}", &data[..n]);
361     ///                 }
362     ///                 // False-positive, continue
363     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
364     ///                 Err(e) => {
365     ///                     return Err(e);
366     ///                 }
367     ///             }
368     ///         }
369     ///
370     ///         if ready.is_writable() {
371     ///             // Write some data
372     ///             match socket.try_send(b"hello world") {
373     ///                 Ok(n) => {
374     ///                     println!("sent {} bytes", n);
375     ///                 }
376     ///                 // False-positive, continue
377     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {}
378     ///                 Err(e) => {
379     ///                     return Err(e);
380     ///                 }
381     ///             }
382     ///         }
383     ///     }
384     /// }
385     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>386     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
387         let event = self.io.registration().readiness(interest).await?;
388         Ok(event.ready)
389     }
390 
391     /// Waits for the socket to become writable.
392     ///
393     /// This function is equivalent to `ready(Interest::WRITABLE)` and is
394     /// usually paired with `try_send()` or `try_send_to()`.
395     ///
396     /// The function may complete without the socket being writable. This is a
397     /// false-positive and attempting a `try_send()` will return with
398     /// `io::ErrorKind::WouldBlock`.
399     ///
400     /// # Cancel safety
401     ///
402     /// This method is cancel safe. Once a readiness event occurs, the method
403     /// will continue to return immediately until the readiness event is
404     /// consumed by an attempt to write that fails with `WouldBlock` or
405     /// `Poll::Pending`.
406     ///
407     /// # Examples
408     ///
409     /// ```no_run
410     /// use tokio::net::UdpSocket;
411     /// use std::io;
412     ///
413     /// #[tokio::main]
414     /// async fn main() -> io::Result<()> {
415     ///     // Bind socket
416     ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
417     ///     socket.connect("127.0.0.1:8081").await?;
418     ///
419     ///     loop {
420     ///         // Wait for the socket to be writable
421     ///         socket.writable().await?;
422     ///
423     ///         // Try to send data, this may still fail with `WouldBlock`
424     ///         // if the readiness event is a false positive.
425     ///         match socket.try_send(b"hello world") {
426     ///             Ok(n) => {
427     ///                 break;
428     ///             }
429     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
430     ///                 continue;
431     ///             }
432     ///             Err(e) => {
433     ///                 return Err(e);
434     ///             }
435     ///         }
436     ///     }
437     ///
438     ///     Ok(())
439     /// }
440     /// ```
writable(&self) -> io::Result<()>441     pub async fn writable(&self) -> io::Result<()> {
442         self.ready(Interest::WRITABLE).await?;
443         Ok(())
444     }
445 
446     /// Polls for write/send readiness.
447     ///
448     /// If the udp stream is not currently ready for sending, this method will
449     /// store a clone of the `Waker` from the provided `Context`. When the udp
450     /// stream becomes ready for sending, `Waker::wake` will be called on the
451     /// waker.
452     ///
453     /// Note that on multiple calls to `poll_send_ready` or `poll_send`, only
454     /// the `Waker` from the `Context` passed to the most recent call is
455     /// scheduled to receive a wakeup. (However, `poll_recv_ready` retains a
456     /// second, independent waker.)
457     ///
458     /// This function is intended for cases where creating and pinning a future
459     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
460     /// preferred, as this supports polling from multiple tasks at once.
461     ///
462     /// # Return value
463     ///
464     /// The function returns:
465     ///
466     /// * `Poll::Pending` if the udp stream is not ready for writing.
467     /// * `Poll::Ready(Ok(()))` if the udp stream is ready for writing.
468     /// * `Poll::Ready(Err(e))` if an error is encountered.
469     ///
470     /// # Errors
471     ///
472     /// This function may encounter any standard I/O error except `WouldBlock`.
473     ///
474     /// [`writable`]: method@Self::writable
poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>475     pub fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
476         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
477     }
478 
479     /// Sends data on the socket to the remote address that the socket is
480     /// connected to.
481     ///
482     /// The [`connect`] method will connect this socket to a remote address.
483     /// This method will fail if the socket is not connected.
484     ///
485     /// [`connect`]: method@Self::connect
486     ///
487     /// # Return
488     ///
489     /// On success, the number of bytes sent is returned, otherwise, the
490     /// encountered error is returned.
491     ///
492     /// # Cancel safety
493     ///
494     /// This method is cancel safe. If `send` is used as the event in a
495     /// [`tokio::select!`](crate::select) statement and some other branch
496     /// completes first, then it is guaranteed that the message was not sent.
497     ///
498     /// # Examples
499     ///
500     /// ```no_run
501     /// use tokio::io;
502     /// use tokio::net::UdpSocket;
503     ///
504     /// #[tokio::main]
505     /// async fn main() -> io::Result<()> {
506     ///     // Bind socket
507     ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
508     ///     socket.connect("127.0.0.1:8081").await?;
509     ///
510     ///     // Send a message
511     ///     socket.send(b"hello world").await?;
512     ///
513     ///     Ok(())
514     /// }
515     /// ```
send(&self, buf: &[u8]) -> io::Result<usize>516     pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
517         self.io
518             .registration()
519             .async_io(Interest::WRITABLE, || self.io.send(buf))
520             .await
521     }
522 
523     /// Attempts to send data on the socket to the remote address to which it
524     /// was previously `connect`ed.
525     ///
526     /// The [`connect`] method will connect this socket to a remote address.
527     /// This method will fail if the socket is not connected.
528     ///
529     /// Note that on multiple calls to a `poll_*` method in the send direction,
530     /// only the `Waker` from the `Context` passed to the most recent call will
531     /// be scheduled to receive a wakeup.
532     ///
533     /// # Return value
534     ///
535     /// The function returns:
536     ///
537     /// * `Poll::Pending` if the socket is not available to write
538     /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent
539     /// * `Poll::Ready(Err(e))` if an error is encountered.
540     ///
541     /// # Errors
542     ///
543     /// This function may encounter any standard I/O error except `WouldBlock`.
544     ///
545     /// [`connect`]: method@Self::connect
poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>546     pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
547         self.io
548             .registration()
549             .poll_write_io(cx, || self.io.send(buf))
550     }
551 
552     /// Tries to send data on the socket to the remote address to which it is
553     /// connected.
554     ///
555     /// When the socket buffer is full, `Err(io::ErrorKind::WouldBlock)` is
556     /// returned. This function is usually paired with `writable()`.
557     ///
558     /// # Returns
559     ///
560     /// If successful, `Ok(n)` is returned, where `n` is the number of bytes
561     /// sent. If the socket is not ready to send data,
562     /// `Err(ErrorKind::WouldBlock)` is returned.
563     ///
564     /// # Examples
565     ///
566     /// ```no_run
567     /// use tokio::net::UdpSocket;
568     /// use std::io;
569     ///
570     /// #[tokio::main]
571     /// async fn main() -> io::Result<()> {
572     ///     // Bind a UDP socket
573     ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
574     ///
575     ///     // Connect to a peer
576     ///     socket.connect("127.0.0.1:8081").await?;
577     ///
578     ///     loop {
579     ///         // Wait for the socket to be writable
580     ///         socket.writable().await?;
581     ///
582     ///         // Try to send data, this may still fail with `WouldBlock`
583     ///         // if the readiness event is a false positive.
584     ///         match socket.try_send(b"hello world") {
585     ///             Ok(n) => {
586     ///                 break;
587     ///             }
588     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
589     ///                 continue;
590     ///             }
591     ///             Err(e) => {
592     ///                 return Err(e);
593     ///             }
594     ///         }
595     ///     }
596     ///
597     ///     Ok(())
598     /// }
599     /// ```
try_send(&self, buf: &[u8]) -> io::Result<usize>600     pub fn try_send(&self, buf: &[u8]) -> io::Result<usize> {
601         self.io
602             .registration()
603             .try_io(Interest::WRITABLE, || self.io.send(buf))
604     }
605 
606     /// Waits for the socket to become readable.
607     ///
608     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
609     /// paired with `try_recv()`.
610     ///
611     /// The function may complete without the socket being readable. This is a
612     /// false-positive and attempting a `try_recv()` will return with
613     /// `io::ErrorKind::WouldBlock`.
614     ///
615     /// # Cancel safety
616     ///
617     /// This method is cancel safe. Once a readiness event occurs, the method
618     /// will continue to return immediately until the readiness event is
619     /// consumed by an attempt to read that fails with `WouldBlock` or
620     /// `Poll::Pending`.
621     ///
622     /// # Examples
623     ///
624     /// ```no_run
625     /// use tokio::net::UdpSocket;
626     /// use std::io;
627     ///
628     /// #[tokio::main]
629     /// async fn main() -> io::Result<()> {
630     ///     // Connect to a peer
631     ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
632     ///     socket.connect("127.0.0.1:8081").await?;
633     ///
634     ///     loop {
635     ///         // Wait for the socket to be readable
636     ///         socket.readable().await?;
637     ///
638     ///         // The buffer is **not** included in the async task and will
639     ///         // only exist on the stack.
640     ///         let mut buf = [0; 1024];
641     ///
642     ///         // Try to recv data, this may still fail with `WouldBlock`
643     ///         // if the readiness event is a false positive.
644     ///         match socket.try_recv(&mut buf) {
645     ///             Ok(n) => {
646     ///                 println!("GOT {:?}", &buf[..n]);
647     ///                 break;
648     ///             }
649     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
650     ///                 continue;
651     ///             }
652     ///             Err(e) => {
653     ///                 return Err(e);
654     ///             }
655     ///         }
656     ///     }
657     ///
658     ///     Ok(())
659     /// }
660     /// ```
readable(&self) -> io::Result<()>661     pub async fn readable(&self) -> io::Result<()> {
662         self.ready(Interest::READABLE).await?;
663         Ok(())
664     }
665 
666     /// Polls for read/receive readiness.
667     ///
668     /// If the udp stream is not currently ready for receiving, this method will
669     /// store a clone of the `Waker` from the provided `Context`. When the udp
670     /// socket becomes ready for reading, `Waker::wake` will be called on the
671     /// waker.
672     ///
673     /// Note that on multiple calls to `poll_recv_ready`, `poll_recv` or
674     /// `poll_peek`, only the `Waker` from the `Context` passed to the most
675     /// recent call is scheduled to receive a wakeup. (However,
676     /// `poll_send_ready` retains a second, independent waker.)
677     ///
678     /// This function is intended for cases where creating and pinning a future
679     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
680     /// preferred, as this supports polling from multiple tasks at once.
681     ///
682     /// # Return value
683     ///
684     /// The function returns:
685     ///
686     /// * `Poll::Pending` if the udp stream is not ready for reading.
687     /// * `Poll::Ready(Ok(()))` if the udp stream is ready for reading.
688     /// * `Poll::Ready(Err(e))` if an error is encountered.
689     ///
690     /// # Errors
691     ///
692     /// This function may encounter any standard I/O error except `WouldBlock`.
693     ///
694     /// [`readable`]: method@Self::readable
poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>695     pub fn poll_recv_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
696         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
697     }
698 
699     /// Receives a single datagram message on the socket from the remote address
700     /// to which it is connected. On success, returns the number of bytes read.
701     ///
702     /// The function must be called with valid byte array `buf` of sufficient
703     /// size to hold the message bytes. If a message is too long to fit in the
704     /// supplied buffer, excess bytes may be discarded.
705     ///
706     /// The [`connect`] method will connect this socket to a remote address.
707     /// This method will fail if the socket is not connected.
708     ///
709     /// # Cancel safety
710     ///
711     /// This method is cancel safe. If `recv_from` is used as the event in a
712     /// [`tokio::select!`](crate::select) statement and some other branch
713     /// completes first, it is guaranteed that no messages were received on this
714     /// socket.
715     ///
716     /// [`connect`]: method@Self::connect
717     ///
718     /// ```no_run
719     /// use tokio::net::UdpSocket;
720     /// use std::io;
721     ///
722     /// #[tokio::main]
723     /// async fn main() -> io::Result<()> {
724     ///     // Bind socket
725     ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
726     ///     socket.connect("127.0.0.1:8081").await?;
727     ///
728     ///     let mut buf = vec![0; 10];
729     ///     let n = socket.recv(&mut buf).await?;
730     ///
731     ///     println!("received {} bytes {:?}", n, &buf[..n]);
732     ///
733     ///     Ok(())
734     /// }
735     /// ```
recv(&self, buf: &mut [u8]) -> io::Result<usize>736     pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
737         self.io
738             .registration()
739             .async_io(Interest::READABLE, || self.io.recv(buf))
740             .await
741     }
742 
743     /// Attempts to receive a single datagram message on the socket from the remote
744     /// address to which it is `connect`ed.
745     ///
746     /// The [`connect`] method will connect this socket to a remote address. This method
747     /// resolves to an error if the socket is not connected.
748     ///
749     /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
750     /// `Waker` from the `Context` passed to the most recent call will be scheduled to
751     /// receive a wakeup.
752     ///
753     /// # Return value
754     ///
755     /// The function returns:
756     ///
757     /// * `Poll::Pending` if the socket is not ready to read
758     /// * `Poll::Ready(Ok(()))` reads data `ReadBuf` if the socket is ready
759     /// * `Poll::Ready(Err(e))` if an error is encountered.
760     ///
761     /// # Errors
762     ///
763     /// This function may encounter any standard I/O error except `WouldBlock`.
764     ///
765     /// [`connect`]: method@Self::connect
poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>>766     pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll<io::Result<()>> {
767         let n = ready!(self.io.registration().poll_read_io(cx, || {
768             // Safety: will not read the maybe uninitialized bytes.
769             let b = unsafe {
770                 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
771             };
772 
773             self.io.recv(b)
774         }))?;
775 
776         // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
777         unsafe {
778             buf.assume_init(n);
779         }
780         buf.advance(n);
781         Poll::Ready(Ok(()))
782     }
783 
784     /// Tries to receive a single datagram message on the socket from the remote
785     /// address to which it is connected. On success, returns the number of
786     /// bytes read.
787     ///
788     /// The function must be called with valid byte array buf of sufficient size
789     /// to hold the message bytes. If a message is too long to fit in the
790     /// supplied buffer, excess bytes may be discarded.
791     ///
792     /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
793     /// returned. This function is usually paired with `readable()`.
794     ///
795     /// # Examples
796     ///
797     /// ```no_run
798     /// use tokio::net::UdpSocket;
799     /// use std::io;
800     ///
801     /// #[tokio::main]
802     /// async fn main() -> io::Result<()> {
803     ///     // Connect to a peer
804     ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
805     ///     socket.connect("127.0.0.1:8081").await?;
806     ///
807     ///     loop {
808     ///         // Wait for the socket to be readable
809     ///         socket.readable().await?;
810     ///
811     ///         // The buffer is **not** included in the async task and will
812     ///         // only exist on the stack.
813     ///         let mut buf = [0; 1024];
814     ///
815     ///         // Try to recv data, this may still fail with `WouldBlock`
816     ///         // if the readiness event is a false positive.
817     ///         match socket.try_recv(&mut buf) {
818     ///             Ok(n) => {
819     ///                 println!("GOT {:?}", &buf[..n]);
820     ///                 break;
821     ///             }
822     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
823     ///                 continue;
824     ///             }
825     ///             Err(e) => {
826     ///                 return Err(e);
827     ///             }
828     ///         }
829     ///     }
830     ///
831     ///     Ok(())
832     /// }
833     /// ```
try_recv(&self, buf: &mut [u8]) -> io::Result<usize>834     pub fn try_recv(&self, buf: &mut [u8]) -> io::Result<usize> {
835         self.io
836             .registration()
837             .try_io(Interest::READABLE, || self.io.recv(buf))
838     }
839 
840     cfg_io_util! {
841         /// Tries to receive data from the stream into the provided buffer, advancing the
842         /// buffer's internal cursor, returning how many bytes were read.
843         ///
844         /// The function must be called with valid byte array buf of sufficient size
845         /// to hold the message bytes. If a message is too long to fit in the
846         /// supplied buffer, excess bytes may be discarded.
847         ///
848         /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
849         /// returned. This function is usually paired with `readable()`.
850         ///
851         /// # Examples
852         ///
853         /// ```no_run
854         /// use tokio::net::UdpSocket;
855         /// use std::io;
856         ///
857         /// #[tokio::main]
858         /// async fn main() -> io::Result<()> {
859         ///     // Connect to a peer
860         ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
861         ///     socket.connect("127.0.0.1:8081").await?;
862         ///
863         ///     loop {
864         ///         // Wait for the socket to be readable
865         ///         socket.readable().await?;
866         ///
867         ///         let mut buf = Vec::with_capacity(1024);
868         ///
869         ///         // Try to recv data, this may still fail with `WouldBlock`
870         ///         // if the readiness event is a false positive.
871         ///         match socket.try_recv_buf(&mut buf) {
872         ///             Ok(n) => {
873         ///                 println!("GOT {:?}", &buf[..n]);
874         ///                 break;
875         ///             }
876         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
877         ///                 continue;
878         ///             }
879         ///             Err(e) => {
880         ///                 return Err(e);
881         ///             }
882         ///         }
883         ///     }
884         ///
885         ///     Ok(())
886         /// }
887         /// ```
888         pub fn try_recv_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
889             self.io.registration().try_io(Interest::READABLE, || {
890                 let dst = buf.chunk_mut();
891                 let dst =
892                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
893 
894                 // Safety: We trust `UdpSocket::recv` to have filled up `n` bytes in the
895                 // buffer.
896                 let n = (&*self.io).recv(dst)?;
897 
898                 unsafe {
899                     buf.advance_mut(n);
900                 }
901 
902                 Ok(n)
903             })
904         }
905 
906         /// Tries to receive a single datagram message on the socket. On success,
907         /// returns the number of bytes read and the origin.
908         ///
909         /// The function must be called with valid byte array buf of sufficient size
910         /// to hold the message bytes. If a message is too long to fit in the
911         /// supplied buffer, excess bytes may be discarded.
912         ///
913         /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
914         /// returned. This function is usually paired with `readable()`.
915         ///
916         /// # Examples
917         ///
918         /// ```no_run
919         /// use tokio::net::UdpSocket;
920         /// use std::io;
921         ///
922         /// #[tokio::main]
923         /// async fn main() -> io::Result<()> {
924         ///     // Connect to a peer
925         ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
926         ///
927         ///     loop {
928         ///         // Wait for the socket to be readable
929         ///         socket.readable().await?;
930         ///
931         ///         let mut buf = Vec::with_capacity(1024);
932         ///
933         ///         // Try to recv data, this may still fail with `WouldBlock`
934         ///         // if the readiness event is a false positive.
935         ///         match socket.try_recv_buf_from(&mut buf) {
936         ///             Ok((n, _addr)) => {
937         ///                 println!("GOT {:?}", &buf[..n]);
938         ///                 break;
939         ///             }
940         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
941         ///                 continue;
942         ///             }
943         ///             Err(e) => {
944         ///                 return Err(e);
945         ///             }
946         ///         }
947         ///     }
948         ///
949         ///     Ok(())
950         /// }
951         /// ```
952         pub fn try_recv_buf_from<B: BufMut>(&self, buf: &mut B) -> io::Result<(usize, SocketAddr)> {
953             self.io.registration().try_io(Interest::READABLE, || {
954                 let dst = buf.chunk_mut();
955                 let dst =
956                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
957 
958                 // Safety: We trust `UdpSocket::recv_from` to have filled up `n` bytes in the
959                 // buffer.
960                 let (n, addr) = (&*self.io).recv_from(dst)?;
961 
962                 unsafe {
963                     buf.advance_mut(n);
964                 }
965 
966                 Ok((n, addr))
967             })
968         }
969     }
970 
971     /// Sends data on the socket to the given address. On success, returns the
972     /// number of bytes written.
973     ///
974     /// Address type can be any implementor of [`ToSocketAddrs`] trait. See its
975     /// documentation for concrete examples.
976     ///
977     /// It is possible for `addr` to yield multiple addresses, but `send_to`
978     /// will only send data to the first address yielded by `addr`.
979     ///
980     /// This will return an error when the IP version of the local socket does
981     /// not match that returned from [`ToSocketAddrs`].
982     ///
983     /// [`ToSocketAddrs`]: crate::net::ToSocketAddrs
984     ///
985     /// # Cancel safety
986     ///
987     /// This method is cancel safe. If `send_to` is used as the event in a
988     /// [`tokio::select!`](crate::select) statement and some other branch
989     /// completes first, then it is guaranteed that the message was not sent.
990     ///
991     /// # Example
992     ///
993     /// ```no_run
994     /// use tokio::net::UdpSocket;
995     /// use std::io;
996     ///
997     /// #[tokio::main]
998     /// async fn main() -> io::Result<()> {
999     ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
1000     ///     let len = socket.send_to(b"hello world", "127.0.0.1:8081").await?;
1001     ///
1002     ///     println!("Sent {} bytes", len);
1003     ///
1004     ///     Ok(())
1005     /// }
1006     /// ```
send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize>1007     pub async fn send_to<A: ToSocketAddrs>(&self, buf: &[u8], target: A) -> io::Result<usize> {
1008         let mut addrs = to_socket_addrs(target).await?;
1009 
1010         match addrs.next() {
1011             Some(target) => self.send_to_addr(buf, target).await,
1012             None => Err(io::Error::new(
1013                 io::ErrorKind::InvalidInput,
1014                 "no addresses to send data to",
1015             )),
1016         }
1017     }
1018 
1019     /// Attempts to send data on the socket to a given address.
1020     ///
1021     /// Note that on multiple calls to a `poll_*` method in the send direction, only the
1022     /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1023     /// receive a wakeup.
1024     ///
1025     /// # Return value
1026     ///
1027     /// The function returns:
1028     ///
1029     /// * `Poll::Pending` if the socket is not ready to write
1030     /// * `Poll::Ready(Ok(n))` `n` is the number of bytes sent.
1031     /// * `Poll::Ready(Err(e))` if an error is encountered.
1032     ///
1033     /// # Errors
1034     ///
1035     /// This function may encounter any standard I/O error except `WouldBlock`.
poll_send_to( &self, cx: &mut Context<'_>, buf: &[u8], target: SocketAddr, ) -> Poll<io::Result<usize>>1036     pub fn poll_send_to(
1037         &self,
1038         cx: &mut Context<'_>,
1039         buf: &[u8],
1040         target: SocketAddr,
1041     ) -> Poll<io::Result<usize>> {
1042         self.io
1043             .registration()
1044             .poll_write_io(cx, || self.io.send_to(buf, target))
1045     }
1046 
1047     /// Tries to send data on the socket to the given address, but if the send is
1048     /// blocked this will return right away.
1049     ///
1050     /// This function is usually paired with `writable()`.
1051     ///
1052     /// # Returns
1053     ///
1054     /// If successful, returns the number of bytes sent
1055     ///
1056     /// Users should ensure that when the remote cannot receive, the
1057     /// [`ErrorKind::WouldBlock`] is properly handled. An error can also occur
1058     /// if the IP version of the socket does not match that of `target`.
1059     ///
1060     /// [`ErrorKind::WouldBlock`]: std::io::ErrorKind::WouldBlock
1061     ///
1062     /// # Example
1063     ///
1064     /// ```no_run
1065     /// use tokio::net::UdpSocket;
1066     /// use std::error::Error;
1067     /// use std::io;
1068     ///
1069     /// #[tokio::main]
1070     /// async fn main() -> Result<(), Box<dyn Error>> {
1071     ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
1072     ///
1073     ///     let dst = "127.0.0.1:8081".parse()?;
1074     ///
1075     ///     loop {
1076     ///         socket.writable().await?;
1077     ///
1078     ///         match socket.try_send_to(&b"hello world"[..], dst) {
1079     ///             Ok(sent) => {
1080     ///                 println!("sent {} bytes", sent);
1081     ///                 break;
1082     ///             }
1083     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1084     ///                 // Writable false positive.
1085     ///                 continue;
1086     ///             }
1087     ///             Err(e) => return Err(e.into()),
1088     ///         }
1089     ///     }
1090     ///
1091     ///     Ok(())
1092     /// }
1093     /// ```
try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize>1094     pub fn try_send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
1095         self.io
1096             .registration()
1097             .try_io(Interest::WRITABLE, || self.io.send_to(buf, target))
1098     }
1099 
send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize>1100     async fn send_to_addr(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
1101         self.io
1102             .registration()
1103             .async_io(Interest::WRITABLE, || self.io.send_to(buf, target))
1104             .await
1105     }
1106 
1107     /// Receives a single datagram message on the socket. On success, returns
1108     /// the number of bytes read and the origin.
1109     ///
1110     /// The function must be called with valid byte array `buf` of sufficient
1111     /// size to hold the message bytes. If a message is too long to fit in the
1112     /// supplied buffer, excess bytes may be discarded.
1113     ///
1114     /// # Cancel safety
1115     ///
1116     /// This method is cancel safe. If `recv_from` is used as the event in a
1117     /// [`tokio::select!`](crate::select) statement and some other branch
1118     /// completes first, it is guaranteed that no messages were received on this
1119     /// socket.
1120     ///
1121     /// # Example
1122     ///
1123     /// ```no_run
1124     /// use tokio::net::UdpSocket;
1125     /// use std::io;
1126     ///
1127     /// #[tokio::main]
1128     /// async fn main() -> io::Result<()> {
1129     ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
1130     ///
1131     ///     let mut buf = vec![0u8; 32];
1132     ///     let (len, addr) = socket.recv_from(&mut buf).await?;
1133     ///
1134     ///     println!("received {:?} bytes from {:?}", len, addr);
1135     ///
1136     ///     Ok(())
1137     /// }
1138     /// ```
recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1139     pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1140         self.io
1141             .registration()
1142             .async_io(Interest::READABLE, || self.io.recv_from(buf))
1143             .await
1144     }
1145 
1146     /// Attempts to receive a single datagram on the socket.
1147     ///
1148     /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
1149     /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1150     /// receive a wakeup.
1151     ///
1152     /// # Return value
1153     ///
1154     /// The function returns:
1155     ///
1156     /// * `Poll::Pending` if the socket is not ready to read
1157     /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
1158     /// * `Poll::Ready(Err(e))` if an error is encountered.
1159     ///
1160     /// # Errors
1161     ///
1162     /// This function may encounter any standard I/O error except `WouldBlock`.
poll_recv_from( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<SocketAddr>>1163     pub fn poll_recv_from(
1164         &self,
1165         cx: &mut Context<'_>,
1166         buf: &mut ReadBuf<'_>,
1167     ) -> Poll<io::Result<SocketAddr>> {
1168         let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
1169             // Safety: will not read the maybe uninitialized bytes.
1170             let b = unsafe {
1171                 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1172             };
1173 
1174             self.io.recv_from(b)
1175         }))?;
1176 
1177         // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1178         unsafe {
1179             buf.assume_init(n);
1180         }
1181         buf.advance(n);
1182         Poll::Ready(Ok(addr))
1183     }
1184 
1185     /// Tries to receive a single datagram message on the socket. On success,
1186     /// returns the number of bytes read and the origin.
1187     ///
1188     /// The function must be called with valid byte array buf of sufficient size
1189     /// to hold the message bytes. If a message is too long to fit in the
1190     /// supplied buffer, excess bytes may be discarded.
1191     ///
1192     /// When there is no pending data, `Err(io::ErrorKind::WouldBlock)` is
1193     /// returned. This function is usually paired with `readable()`.
1194     ///
1195     /// # Examples
1196     ///
1197     /// ```no_run
1198     /// use tokio::net::UdpSocket;
1199     /// use std::io;
1200     ///
1201     /// #[tokio::main]
1202     /// async fn main() -> io::Result<()> {
1203     ///     // Connect to a peer
1204     ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
1205     ///
1206     ///     loop {
1207     ///         // Wait for the socket to be readable
1208     ///         socket.readable().await?;
1209     ///
1210     ///         // The buffer is **not** included in the async task and will
1211     ///         // only exist on the stack.
1212     ///         let mut buf = [0; 1024];
1213     ///
1214     ///         // Try to recv data, this may still fail with `WouldBlock`
1215     ///         // if the readiness event is a false positive.
1216     ///         match socket.try_recv_from(&mut buf) {
1217     ///             Ok((n, _addr)) => {
1218     ///                 println!("GOT {:?}", &buf[..n]);
1219     ///                 break;
1220     ///             }
1221     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1222     ///                 continue;
1223     ///             }
1224     ///             Err(e) => {
1225     ///                 return Err(e);
1226     ///             }
1227     ///         }
1228     ///     }
1229     ///
1230     ///     Ok(())
1231     /// }
1232     /// ```
try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1233     pub fn try_recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1234         self.io
1235             .registration()
1236             .try_io(Interest::READABLE, || self.io.recv_from(buf))
1237     }
1238 
1239     /// Tries to read or write from the socket using a user-provided IO operation.
1240     ///
1241     /// If the socket is ready, the provided closure is called. The closure
1242     /// should attempt to perform IO operation from the socket by manually
1243     /// calling the appropriate syscall. If the operation fails because the
1244     /// socket is not actually ready, then the closure should return a
1245     /// `WouldBlock` error and the readiness flag is cleared. The return value
1246     /// of the closure is then returned by `try_io`.
1247     ///
1248     /// If the socket is not ready, then the closure is not called
1249     /// and a `WouldBlock` error is returned.
1250     ///
1251     /// The closure should only return a `WouldBlock` error if it has performed
1252     /// an IO operation on the socket that failed due to the socket not being
1253     /// ready. Returning a `WouldBlock` error in any other situation will
1254     /// incorrectly clear the readiness flag, which can cause the socket to
1255     /// behave incorrectly.
1256     ///
1257     /// The closure should not perform the IO operation using any of the methods
1258     /// defined on the Tokio `UdpSocket` type, as this will mess with the
1259     /// readiness flag and can cause the socket to behave incorrectly.
1260     ///
1261     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1262     ///
1263     /// [`readable()`]: UdpSocket::readable()
1264     /// [`writable()`]: UdpSocket::writable()
1265     /// [`ready()`]: UdpSocket::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1266     pub fn try_io<R>(
1267         &self,
1268         interest: Interest,
1269         f: impl FnOnce() -> io::Result<R>,
1270     ) -> io::Result<R> {
1271         self.io.registration().try_io(interest, f)
1272     }
1273 
1274     /// Receives data from the socket, without removing it from the input queue.
1275     /// On success, returns the number of bytes read and the address from whence
1276     /// the data came.
1277     ///
1278     /// # Notes
1279     ///
1280     /// On Windows, if the data is larger than the buffer specified, the buffer
1281     /// is filled with the first part of the data, and peek_from returns the error
1282     /// WSAEMSGSIZE(10040). The excess data is lost.
1283     /// Make sure to always use a sufficiently large buffer to hold the
1284     /// maximum UDP packet size, which can be up to 65536 bytes in size.
1285     ///
1286     /// # Examples
1287     ///
1288     /// ```no_run
1289     /// use tokio::net::UdpSocket;
1290     /// use std::io;
1291     ///
1292     /// #[tokio::main]
1293     /// async fn main() -> io::Result<()> {
1294     ///     let socket = UdpSocket::bind("127.0.0.1:8080").await?;
1295     ///
1296     ///     let mut buf = vec![0u8; 32];
1297     ///     let (len, addr) = socket.peek_from(&mut buf).await?;
1298     ///
1299     ///     println!("peeked {:?} bytes from {:?}", len, addr);
1300     ///
1301     ///     Ok(())
1302     /// }
1303     /// ```
peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)>1304     pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
1305         self.io
1306             .registration()
1307             .async_io(Interest::READABLE, || self.io.peek_from(buf))
1308             .await
1309     }
1310 
1311     /// Receives data from the socket, without removing it from the input queue.
1312     /// On success, returns the number of bytes read.
1313     ///
1314     /// # Notes
1315     ///
1316     /// Note that on multiple calls to a `poll_*` method in the recv direction, only the
1317     /// `Waker` from the `Context` passed to the most recent call will be scheduled to
1318     /// receive a wakeup
1319     ///
1320     /// On Windows, if the data is larger than the buffer specified, the buffer
1321     /// is filled with the first part of the data, and peek returns the error
1322     /// WSAEMSGSIZE(10040). The excess data is lost.
1323     /// Make sure to always use a sufficiently large buffer to hold the
1324     /// maximum UDP packet size, which can be up to 65536 bytes in size.
1325     ///
1326     /// # Return value
1327     ///
1328     /// The function returns:
1329     ///
1330     /// * `Poll::Pending` if the socket is not ready to read
1331     /// * `Poll::Ready(Ok(addr))` reads data from `addr` into `ReadBuf` if the socket is ready
1332     /// * `Poll::Ready(Err(e))` if an error is encountered.
1333     ///
1334     /// # Errors
1335     ///
1336     /// This function may encounter any standard I/O error except `WouldBlock`.
poll_peek_from( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<SocketAddr>>1337     pub fn poll_peek_from(
1338         &self,
1339         cx: &mut Context<'_>,
1340         buf: &mut ReadBuf<'_>,
1341     ) -> Poll<io::Result<SocketAddr>> {
1342         let (n, addr) = ready!(self.io.registration().poll_read_io(cx, || {
1343             // Safety: will not read the maybe uninitialized bytes.
1344             let b = unsafe {
1345                 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
1346             };
1347 
1348             self.io.peek_from(b)
1349         }))?;
1350 
1351         // Safety: We trust `recv` to have filled up `n` bytes in the buffer.
1352         unsafe {
1353             buf.assume_init(n);
1354         }
1355         buf.advance(n);
1356         Poll::Ready(Ok(addr))
1357     }
1358 
1359     /// Gets the value of the `SO_BROADCAST` option for this socket.
1360     ///
1361     /// For more information about this option, see [`set_broadcast`].
1362     ///
1363     /// [`set_broadcast`]: method@Self::set_broadcast
broadcast(&self) -> io::Result<bool>1364     pub fn broadcast(&self) -> io::Result<bool> {
1365         self.io.broadcast()
1366     }
1367 
1368     /// Sets the value of the `SO_BROADCAST` option for this socket.
1369     ///
1370     /// When enabled, this socket is allowed to send packets to a broadcast
1371     /// address.
set_broadcast(&self, on: bool) -> io::Result<()>1372     pub fn set_broadcast(&self, on: bool) -> io::Result<()> {
1373         self.io.set_broadcast(on)
1374     }
1375 
1376     /// Gets the value of the `IP_MULTICAST_LOOP` option for this socket.
1377     ///
1378     /// For more information about this option, see [`set_multicast_loop_v4`].
1379     ///
1380     /// [`set_multicast_loop_v4`]: method@Self::set_multicast_loop_v4
multicast_loop_v4(&self) -> io::Result<bool>1381     pub fn multicast_loop_v4(&self) -> io::Result<bool> {
1382         self.io.multicast_loop_v4()
1383     }
1384 
1385     /// Sets the value of the `IP_MULTICAST_LOOP` option for this socket.
1386     ///
1387     /// If enabled, multicast packets will be looped back to the local socket.
1388     ///
1389     /// # Note
1390     ///
1391     /// This may not have any affect on IPv6 sockets.
set_multicast_loop_v4(&self, on: bool) -> io::Result<()>1392     pub fn set_multicast_loop_v4(&self, on: bool) -> io::Result<()> {
1393         self.io.set_multicast_loop_v4(on)
1394     }
1395 
1396     /// Gets the value of the `IP_MULTICAST_TTL` option for this socket.
1397     ///
1398     /// For more information about this option, see [`set_multicast_ttl_v4`].
1399     ///
1400     /// [`set_multicast_ttl_v4`]: method@Self::set_multicast_ttl_v4
multicast_ttl_v4(&self) -> io::Result<u32>1401     pub fn multicast_ttl_v4(&self) -> io::Result<u32> {
1402         self.io.multicast_ttl_v4()
1403     }
1404 
1405     /// Sets the value of the `IP_MULTICAST_TTL` option for this socket.
1406     ///
1407     /// Indicates the time-to-live value of outgoing multicast packets for
1408     /// this socket. The default value is 1 which means that multicast packets
1409     /// don't leave the local network unless explicitly requested.
1410     ///
1411     /// # Note
1412     ///
1413     /// This may not have any affect on IPv6 sockets.
set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()>1414     pub fn set_multicast_ttl_v4(&self, ttl: u32) -> io::Result<()> {
1415         self.io.set_multicast_ttl_v4(ttl)
1416     }
1417 
1418     /// Gets the value of the `IPV6_MULTICAST_LOOP` option for this socket.
1419     ///
1420     /// For more information about this option, see [`set_multicast_loop_v6`].
1421     ///
1422     /// [`set_multicast_loop_v6`]: method@Self::set_multicast_loop_v6
multicast_loop_v6(&self) -> io::Result<bool>1423     pub fn multicast_loop_v6(&self) -> io::Result<bool> {
1424         self.io.multicast_loop_v6()
1425     }
1426 
1427     /// Sets the value of the `IPV6_MULTICAST_LOOP` option for this socket.
1428     ///
1429     /// Controls whether this socket sees the multicast packets it sends itself.
1430     ///
1431     /// # Note
1432     ///
1433     /// This may not have any affect on IPv4 sockets.
set_multicast_loop_v6(&self, on: bool) -> io::Result<()>1434     pub fn set_multicast_loop_v6(&self, on: bool) -> io::Result<()> {
1435         self.io.set_multicast_loop_v6(on)
1436     }
1437 
1438     /// Gets the value of the `IP_TTL` option for this socket.
1439     ///
1440     /// For more information about this option, see [`set_ttl`].
1441     ///
1442     /// [`set_ttl`]: method@Self::set_ttl
1443     ///
1444     /// # Examples
1445     ///
1446     /// ```no_run
1447     /// use tokio::net::UdpSocket;
1448     /// # use std::io;
1449     ///
1450     /// # async fn dox() -> io::Result<()> {
1451     /// let sock = UdpSocket::bind("127.0.0.1:8080").await?;
1452     ///
1453     /// println!("{:?}", sock.ttl()?);
1454     /// # Ok(())
1455     /// # }
1456     /// ```
ttl(&self) -> io::Result<u32>1457     pub fn ttl(&self) -> io::Result<u32> {
1458         self.io.ttl()
1459     }
1460 
1461     /// Sets the value for the `IP_TTL` option on this socket.
1462     ///
1463     /// This value sets the time-to-live field that is used in every packet sent
1464     /// from this socket.
1465     ///
1466     /// # Examples
1467     ///
1468     /// ```no_run
1469     /// use tokio::net::UdpSocket;
1470     /// # use std::io;
1471     ///
1472     /// # async fn dox() -> io::Result<()> {
1473     /// let sock = UdpSocket::bind("127.0.0.1:8080").await?;
1474     /// sock.set_ttl(60)?;
1475     ///
1476     /// # Ok(())
1477     /// # }
1478     /// ```
set_ttl(&self, ttl: u32) -> io::Result<()>1479     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
1480         self.io.set_ttl(ttl)
1481     }
1482 
1483     /// Executes an operation of the `IP_ADD_MEMBERSHIP` type.
1484     ///
1485     /// This function specifies a new multicast group for this socket to join.
1486     /// The address must be a valid multicast address, and `interface` is the
1487     /// address of the local interface with which the system should join the
1488     /// multicast group. If it's equal to `INADDR_ANY` then an appropriate
1489     /// interface is chosen by the system.
join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()>1490     pub fn join_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> {
1491         self.io.join_multicast_v4(&multiaddr, &interface)
1492     }
1493 
1494     /// Executes an operation of the `IPV6_ADD_MEMBERSHIP` type.
1495     ///
1496     /// This function specifies a new multicast group for this socket to join.
1497     /// The address must be a valid multicast address, and `interface` is the
1498     /// index of the interface to join/leave (or 0 to indicate any interface).
join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()>1499     pub fn join_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> {
1500         self.io.join_multicast_v6(multiaddr, interface)
1501     }
1502 
1503     /// Executes an operation of the `IP_DROP_MEMBERSHIP` type.
1504     ///
1505     /// For more information about this option, see [`join_multicast_v4`].
1506     ///
1507     /// [`join_multicast_v4`]: method@Self::join_multicast_v4
leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()>1508     pub fn leave_multicast_v4(&self, multiaddr: Ipv4Addr, interface: Ipv4Addr) -> io::Result<()> {
1509         self.io.leave_multicast_v4(&multiaddr, &interface)
1510     }
1511 
1512     /// Executes an operation of the `IPV6_DROP_MEMBERSHIP` type.
1513     ///
1514     /// For more information about this option, see [`join_multicast_v6`].
1515     ///
1516     /// [`join_multicast_v6`]: method@Self::join_multicast_v6
leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()>1517     pub fn leave_multicast_v6(&self, multiaddr: &Ipv6Addr, interface: u32) -> io::Result<()> {
1518         self.io.leave_multicast_v6(multiaddr, interface)
1519     }
1520 
1521     /// Returns the value of the `SO_ERROR` option.
1522     ///
1523     /// # Examples
1524     /// ```
1525     /// use tokio::net::UdpSocket;
1526     /// use std::io;
1527     ///
1528     /// #[tokio::main]
1529     /// async fn main() -> io::Result<()> {
1530     ///     // Create a socket
1531     ///     let socket = UdpSocket::bind("0.0.0.0:8080").await?;
1532     ///
1533     ///     if let Ok(Some(err)) = socket.take_error() {
1534     ///         println!("Got error: {:?}", err);
1535     ///     }
1536     ///
1537     ///     Ok(())
1538     /// }
1539     /// ```
take_error(&self) -> io::Result<Option<io::Error>>1540     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
1541         self.io.take_error()
1542     }
1543 }
1544 
1545 impl TryFrom<std::net::UdpSocket> for UdpSocket {
1546     type Error = io::Error;
1547 
1548     /// Consumes stream, returning the tokio I/O object.
1549     ///
1550     /// This is equivalent to
1551     /// [`UdpSocket::from_std(stream)`](UdpSocket::from_std).
try_from(stream: std::net::UdpSocket) -> Result<Self, Self::Error>1552     fn try_from(stream: std::net::UdpSocket) -> Result<Self, Self::Error> {
1553         Self::from_std(stream)
1554     }
1555 }
1556 
1557 impl fmt::Debug for UdpSocket {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1558     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1559         self.io.fmt(f)
1560     }
1561 }
1562 
1563 #[cfg(all(unix))]
1564 mod sys {
1565     use super::UdpSocket;
1566     use std::os::unix::prelude::*;
1567 
1568     impl AsRawFd for UdpSocket {
as_raw_fd(&self) -> RawFd1569         fn as_raw_fd(&self) -> RawFd {
1570             self.io.as_raw_fd()
1571         }
1572     }
1573 }
1574 
1575 #[cfg(windows)]
1576 mod sys {
1577     use super::UdpSocket;
1578     use std::os::windows::prelude::*;
1579 
1580     impl AsRawSocket for UdpSocket {
as_raw_socket(&self) -> RawSocket1581         fn as_raw_socket(&self) -> RawSocket {
1582             self.io.as_raw_socket()
1583         }
1584     }
1585 }
1586