• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::future::poll_fn;
2 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
3 use crate::net::unix::split::{split, ReadHalf, WriteHalf};
4 use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
5 use crate::net::unix::ucred::{self, UCred};
6 use crate::net::unix::SocketAddr;
7 
8 use std::fmt;
9 use std::io::{self, Read, Write};
10 use std::net::Shutdown;
11 use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
12 use std::os::unix::net;
13 use std::path::Path;
14 use std::pin::Pin;
15 use std::task::{Context, Poll};
16 
17 cfg_io_util! {
18     use bytes::BufMut;
19 }
20 
21 cfg_net_unix! {
22     /// A structure representing a connected Unix socket.
23     ///
24     /// This socket can be connected directly with [`UnixStream::connect`] or accepted
25     /// from a listener with [`UnixListener::accept`]. Additionally, a pair of
26     /// anonymous Unix sockets can be created with `UnixStream::pair`.
27     ///
28     /// To shut down the stream in the write direction, you can call the
29     /// [`shutdown()`] method. This will cause the other peer to receive a read of
30     /// length 0, indicating that no more data will be sent. This only closes
31     /// the stream in one direction.
32     ///
33     /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
34     /// [`UnixListener::accept`]: crate::net::UnixListener::accept
35     #[cfg_attr(docsrs, doc(alias = "uds"))]
36     pub struct UnixStream {
37         io: PollEvented<mio::net::UnixStream>,
38     }
39 }
40 
41 impl UnixStream {
42     /// Connects to the socket named by `path`.
43     ///
44     /// This function will create a new Unix socket and connect to the path
45     /// specified, associating the returned stream with the default event loop's
46     /// handle.
connect<P>(path: P) -> io::Result<UnixStream> where P: AsRef<Path>,47     pub async fn connect<P>(path: P) -> io::Result<UnixStream>
48     where
49         P: AsRef<Path>,
50     {
51         let stream = mio::net::UnixStream::connect(path)?;
52         let stream = UnixStream::new(stream)?;
53 
54         poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
55 
56         if let Some(e) = stream.io.take_error()? {
57             return Err(e);
58         }
59 
60         Ok(stream)
61     }
62 
63     /// Waits for any of the requested ready states.
64     ///
65     /// This function is usually paired with `try_read()` or `try_write()`. It
66     /// can be used to concurrently read / write to the same socket on a single
67     /// task without splitting the socket.
68     ///
69     /// The function may complete without the socket being ready. This is a
70     /// false-positive and attempting an operation will return with
71     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
72     /// [`Ready`] set, so you should always check the returned value and possibly
73     /// wait again if the requested states are not set.
74     ///
75     /// # Cancel safety
76     ///
77     /// This method is cancel safe. Once a readiness event occurs, the method
78     /// will continue to return immediately until the readiness event is
79     /// consumed by an attempt to read or write that fails with `WouldBlock` or
80     /// `Poll::Pending`.
81     ///
82     /// # Examples
83     ///
84     /// Concurrently read and write to the stream on the same task without
85     /// splitting.
86     ///
87     /// ```no_run
88     /// use tokio::io::Interest;
89     /// use tokio::net::UnixStream;
90     /// use std::error::Error;
91     /// use std::io;
92     ///
93     /// #[tokio::main]
94     /// async fn main() -> Result<(), Box<dyn Error>> {
95     ///     let dir = tempfile::tempdir().unwrap();
96     ///     let bind_path = dir.path().join("bind_path");
97     ///     let stream = UnixStream::connect(bind_path).await?;
98     ///
99     ///     loop {
100     ///         let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
101     ///
102     ///         if ready.is_readable() {
103     ///             let mut data = vec![0; 1024];
104     ///             // Try to read data, this may still fail with `WouldBlock`
105     ///             // if the readiness event is a false positive.
106     ///             match stream.try_read(&mut data) {
107     ///                 Ok(n) => {
108     ///                     println!("read {} bytes", n);
109     ///                 }
110     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
111     ///                     continue;
112     ///                 }
113     ///                 Err(e) => {
114     ///                     return Err(e.into());
115     ///                 }
116     ///             }
117     ///
118     ///         }
119     ///
120     ///         if ready.is_writable() {
121     ///             // Try to write data, this may still fail with `WouldBlock`
122     ///             // if the readiness event is a false positive.
123     ///             match stream.try_write(b"hello world") {
124     ///                 Ok(n) => {
125     ///                     println!("write {} bytes", n);
126     ///                 }
127     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
128     ///                     continue;
129     ///                 }
130     ///                 Err(e) => {
131     ///                     return Err(e.into());
132     ///                 }
133     ///             }
134     ///         }
135     ///     }
136     /// }
137     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>138     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
139         let event = self.io.registration().readiness(interest).await?;
140         Ok(event.ready)
141     }
142 
143     /// Waits for the socket to become readable.
144     ///
145     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
146     /// paired with `try_read()`.
147     ///
148     /// # Cancel safety
149     ///
150     /// This method is cancel safe. Once a readiness event occurs, the method
151     /// will continue to return immediately until the readiness event is
152     /// consumed by an attempt to read that fails with `WouldBlock` or
153     /// `Poll::Pending`.
154     ///
155     /// # Examples
156     ///
157     /// ```no_run
158     /// use tokio::net::UnixStream;
159     /// use std::error::Error;
160     /// use std::io;
161     ///
162     /// #[tokio::main]
163     /// async fn main() -> Result<(), Box<dyn Error>> {
164     ///     // Connect to a peer
165     ///     let dir = tempfile::tempdir().unwrap();
166     ///     let bind_path = dir.path().join("bind_path");
167     ///     let stream = UnixStream::connect(bind_path).await?;
168     ///
169     ///     let mut msg = vec![0; 1024];
170     ///
171     ///     loop {
172     ///         // Wait for the socket to be readable
173     ///         stream.readable().await?;
174     ///
175     ///         // Try to read data, this may still fail with `WouldBlock`
176     ///         // if the readiness event is a false positive.
177     ///         match stream.try_read(&mut msg) {
178     ///             Ok(n) => {
179     ///                 msg.truncate(n);
180     ///                 break;
181     ///             }
182     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
183     ///                 continue;
184     ///             }
185     ///             Err(e) => {
186     ///                 return Err(e.into());
187     ///             }
188     ///         }
189     ///     }
190     ///
191     ///     println!("GOT = {:?}", msg);
192     ///     Ok(())
193     /// }
194     /// ```
readable(&self) -> io::Result<()>195     pub async fn readable(&self) -> io::Result<()> {
196         self.ready(Interest::READABLE).await?;
197         Ok(())
198     }
199 
200     /// Polls for read readiness.
201     ///
202     /// If the unix stream is not currently ready for reading, this method will
203     /// store a clone of the `Waker` from the provided `Context`. When the unix
204     /// stream becomes ready for reading, `Waker::wake` will be called on the
205     /// waker.
206     ///
207     /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
208     /// the `Waker` from the `Context` passed to the most recent call is
209     /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a
210     /// second, independent waker.)
211     ///
212     /// This function is intended for cases where creating and pinning a future
213     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
214     /// preferred, as this supports polling from multiple tasks at once.
215     ///
216     /// # Return value
217     ///
218     /// The function returns:
219     ///
220     /// * `Poll::Pending` if the unix stream is not ready for reading.
221     /// * `Poll::Ready(Ok(()))` if the unix stream is ready for reading.
222     /// * `Poll::Ready(Err(e))` if an error is encountered.
223     ///
224     /// # Errors
225     ///
226     /// This function may encounter any standard I/O error except `WouldBlock`.
227     ///
228     /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>229     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
230         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
231     }
232 
233     /// Try to read data from the stream into the provided buffer, returning how
234     /// many bytes were read.
235     ///
236     /// Receives any pending data from the socket but does not wait for new data
237     /// to arrive. On success, returns the number of bytes read. Because
238     /// `try_read()` is non-blocking, the buffer does not have to be stored by
239     /// the async task and can exist entirely on the stack.
240     ///
241     /// Usually, [`readable()`] or [`ready()`] is used with this function.
242     ///
243     /// [`readable()`]: UnixStream::readable()
244     /// [`ready()`]: UnixStream::ready()
245     ///
246     /// # Return
247     ///
248     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
249     /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
250     ///
251     /// 1. The stream's read half is closed and will no longer yield data.
252     /// 2. The specified buffer was 0 bytes in length.
253     ///
254     /// If the stream is not ready to read data,
255     /// `Err(io::ErrorKind::WouldBlock)` is returned.
256     ///
257     /// # Examples
258     ///
259     /// ```no_run
260     /// use tokio::net::UnixStream;
261     /// use std::error::Error;
262     /// use std::io;
263     ///
264     /// #[tokio::main]
265     /// async fn main() -> Result<(), Box<dyn Error>> {
266     ///     // Connect to a peer
267     ///     let dir = tempfile::tempdir().unwrap();
268     ///     let bind_path = dir.path().join("bind_path");
269     ///     let stream = UnixStream::connect(bind_path).await?;
270     ///
271     ///     loop {
272     ///         // Wait for the socket to be readable
273     ///         stream.readable().await?;
274     ///
275     ///         // Creating the buffer **after** the `await` prevents it from
276     ///         // being stored in the async task.
277     ///         let mut buf = [0; 4096];
278     ///
279     ///         // Try to read data, this may still fail with `WouldBlock`
280     ///         // if the readiness event is a false positive.
281     ///         match stream.try_read(&mut buf) {
282     ///             Ok(0) => break,
283     ///             Ok(n) => {
284     ///                 println!("read {} bytes", n);
285     ///             }
286     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
287     ///                 continue;
288     ///             }
289     ///             Err(e) => {
290     ///                 return Err(e.into());
291     ///             }
292     ///         }
293     ///     }
294     ///
295     ///     Ok(())
296     /// }
297     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>298     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
299         self.io
300             .registration()
301             .try_io(Interest::READABLE, || (&*self.io).read(buf))
302     }
303 
304     /// Tries to read data from the stream into the provided buffers, returning
305     /// how many bytes were read.
306     ///
307     /// Data is copied to fill each buffer in order, with the final buffer
308     /// written to possibly being only partially filled. This method behaves
309     /// equivalently to a single call to [`try_read()`] with concatenated
310     /// buffers.
311     ///
312     /// Receives any pending data from the socket but does not wait for new data
313     /// to arrive. On success, returns the number of bytes read. Because
314     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
315     /// stored by the async task and can exist entirely on the stack.
316     ///
317     /// Usually, [`readable()`] or [`ready()`] is used with this function.
318     ///
319     /// [`try_read()`]: UnixStream::try_read()
320     /// [`readable()`]: UnixStream::readable()
321     /// [`ready()`]: UnixStream::ready()
322     ///
323     /// # Return
324     ///
325     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
326     /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
327     /// and will no longer yield data. If the stream is not ready to read data
328     /// `Err(io::ErrorKind::WouldBlock)` is returned.
329     ///
330     /// # Examples
331     ///
332     /// ```no_run
333     /// use tokio::net::UnixStream;
334     /// use std::error::Error;
335     /// use std::io::{self, IoSliceMut};
336     ///
337     /// #[tokio::main]
338     /// async fn main() -> Result<(), Box<dyn Error>> {
339     ///     // Connect to a peer
340     ///     let dir = tempfile::tempdir().unwrap();
341     ///     let bind_path = dir.path().join("bind_path");
342     ///     let stream = UnixStream::connect(bind_path).await?;
343     ///
344     ///     loop {
345     ///         // Wait for the socket to be readable
346     ///         stream.readable().await?;
347     ///
348     ///         // Creating the buffer **after** the `await` prevents it from
349     ///         // being stored in the async task.
350     ///         let mut buf_a = [0; 512];
351     ///         let mut buf_b = [0; 1024];
352     ///         let mut bufs = [
353     ///             IoSliceMut::new(&mut buf_a),
354     ///             IoSliceMut::new(&mut buf_b),
355     ///         ];
356     ///
357     ///         // Try to read data, this may still fail with `WouldBlock`
358     ///         // if the readiness event is a false positive.
359     ///         match stream.try_read_vectored(&mut bufs) {
360     ///             Ok(0) => break,
361     ///             Ok(n) => {
362     ///                 println!("read {} bytes", n);
363     ///             }
364     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
365     ///                 continue;
366     ///             }
367     ///             Err(e) => {
368     ///                 return Err(e.into());
369     ///             }
370     ///         }
371     ///     }
372     ///
373     ///     Ok(())
374     /// }
375     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>376     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
377         self.io
378             .registration()
379             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
380     }
381 
382     cfg_io_util! {
383         /// Tries to read data from the stream into the provided buffer, advancing the
384         /// buffer's internal cursor, returning how many bytes were read.
385         ///
386         /// Receives any pending data from the socket but does not wait for new data
387         /// to arrive. On success, returns the number of bytes read. Because
388         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
389         /// the async task and can exist entirely on the stack.
390         ///
391         /// Usually, [`readable()`] or [`ready()`] is used with this function.
392         ///
393         /// [`readable()`]: UnixStream::readable()
394         /// [`ready()`]: UnixStream::ready()
395         ///
396         /// # Return
397         ///
398         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
399         /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
400         /// and will no longer yield data. If the stream is not ready to read data
401         /// `Err(io::ErrorKind::WouldBlock)` is returned.
402         ///
403         /// # Examples
404         ///
405         /// ```no_run
406         /// use tokio::net::UnixStream;
407         /// use std::error::Error;
408         /// use std::io;
409         ///
410         /// #[tokio::main]
411         /// async fn main() -> Result<(), Box<dyn Error>> {
412         ///     // Connect to a peer
413         ///     let dir = tempfile::tempdir().unwrap();
414         ///     let bind_path = dir.path().join("bind_path");
415         ///     let stream = UnixStream::connect(bind_path).await?;
416         ///
417         ///     loop {
418         ///         // Wait for the socket to be readable
419         ///         stream.readable().await?;
420         ///
421         ///         let mut buf = Vec::with_capacity(4096);
422         ///
423         ///         // Try to read data, this may still fail with `WouldBlock`
424         ///         // if the readiness event is a false positive.
425         ///         match stream.try_read_buf(&mut buf) {
426         ///             Ok(0) => break,
427         ///             Ok(n) => {
428         ///                 println!("read {} bytes", n);
429         ///             }
430         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
431         ///                 continue;
432         ///             }
433         ///             Err(e) => {
434         ///                 return Err(e.into());
435         ///             }
436         ///         }
437         ///     }
438         ///
439         ///     Ok(())
440         /// }
441         /// ```
442         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
443             self.io.registration().try_io(Interest::READABLE, || {
444                 use std::io::Read;
445 
446                 let dst = buf.chunk_mut();
447                 let dst =
448                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
449 
450                 // Safety: We trust `UnixStream::read` to have filled up `n` bytes in the
451                 // buffer.
452                 let n = (&*self.io).read(dst)?;
453 
454                 unsafe {
455                     buf.advance_mut(n);
456                 }
457 
458                 Ok(n)
459             })
460         }
461     }
462 
463     /// Waits for the socket to become writable.
464     ///
465     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
466     /// paired with `try_write()`.
467     ///
468     /// # Cancel safety
469     ///
470     /// This method is cancel safe. Once a readiness event occurs, the method
471     /// will continue to return immediately until the readiness event is
472     /// consumed by an attempt to write that fails with `WouldBlock` or
473     /// `Poll::Pending`.
474     ///
475     /// # Examples
476     ///
477     /// ```no_run
478     /// use tokio::net::UnixStream;
479     /// use std::error::Error;
480     /// use std::io;
481     ///
482     /// #[tokio::main]
483     /// async fn main() -> Result<(), Box<dyn Error>> {
484     ///     // Connect to a peer
485     ///     let dir = tempfile::tempdir().unwrap();
486     ///     let bind_path = dir.path().join("bind_path");
487     ///     let stream = UnixStream::connect(bind_path).await?;
488     ///
489     ///     loop {
490     ///         // Wait for the socket to be writable
491     ///         stream.writable().await?;
492     ///
493     ///         // Try to write data, this may still fail with `WouldBlock`
494     ///         // if the readiness event is a false positive.
495     ///         match stream.try_write(b"hello world") {
496     ///             Ok(n) => {
497     ///                 break;
498     ///             }
499     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
500     ///                 continue;
501     ///             }
502     ///             Err(e) => {
503     ///                 return Err(e.into());
504     ///             }
505     ///         }
506     ///     }
507     ///
508     ///     Ok(())
509     /// }
510     /// ```
writable(&self) -> io::Result<()>511     pub async fn writable(&self) -> io::Result<()> {
512         self.ready(Interest::WRITABLE).await?;
513         Ok(())
514     }
515 
516     /// Polls for write readiness.
517     ///
518     /// If the unix stream is not currently ready for writing, this method will
519     /// store a clone of the `Waker` from the provided `Context`. When the unix
520     /// stream becomes ready for writing, `Waker::wake` will be called on the
521     /// waker.
522     ///
523     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
524     /// the `Waker` from the `Context` passed to the most recent call is
525     /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
526     /// second, independent waker.)
527     ///
528     /// This function is intended for cases where creating and pinning a future
529     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
530     /// preferred, as this supports polling from multiple tasks at once.
531     ///
532     /// # Return value
533     ///
534     /// The function returns:
535     ///
536     /// * `Poll::Pending` if the unix stream is not ready for writing.
537     /// * `Poll::Ready(Ok(()))` if the unix stream is ready for writing.
538     /// * `Poll::Ready(Err(e))` if an error is encountered.
539     ///
540     /// # Errors
541     ///
542     /// This function may encounter any standard I/O error except `WouldBlock`.
543     ///
544     /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>545     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
546         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
547     }
548 
549     /// Tries to write a buffer to the stream, returning how many bytes were
550     /// written.
551     ///
552     /// The function will attempt to write the entire contents of `buf`, but
553     /// only part of the buffer may be written.
554     ///
555     /// This function is usually paired with `writable()`.
556     ///
557     /// # Return
558     ///
559     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
560     /// number of bytes written. If the stream is not ready to write data,
561     /// `Err(io::ErrorKind::WouldBlock)` is returned.
562     ///
563     /// # Examples
564     ///
565     /// ```no_run
566     /// use tokio::net::UnixStream;
567     /// use std::error::Error;
568     /// use std::io;
569     ///
570     /// #[tokio::main]
571     /// async fn main() -> Result<(), Box<dyn Error>> {
572     ///     // Connect to a peer
573     ///     let dir = tempfile::tempdir().unwrap();
574     ///     let bind_path = dir.path().join("bind_path");
575     ///     let stream = UnixStream::connect(bind_path).await?;
576     ///
577     ///     loop {
578     ///         // Wait for the socket to be writable
579     ///         stream.writable().await?;
580     ///
581     ///         // Try to write data, this may still fail with `WouldBlock`
582     ///         // if the readiness event is a false positive.
583     ///         match stream.try_write(b"hello world") {
584     ///             Ok(n) => {
585     ///                 break;
586     ///             }
587     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
588     ///                 continue;
589     ///             }
590     ///             Err(e) => {
591     ///                 return Err(e.into());
592     ///             }
593     ///         }
594     ///     }
595     ///
596     ///     Ok(())
597     /// }
598     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>599     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
600         self.io
601             .registration()
602             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
603     }
604 
605     /// Tries to write several buffers to the stream, returning how many bytes
606     /// were written.
607     ///
608     /// Data is written from each buffer in order, with the final buffer read
609     /// from possible being only partially consumed. This method behaves
610     /// equivalently to a single call to [`try_write()`] with concatenated
611     /// buffers.
612     ///
613     /// This function is usually paired with `writable()`.
614     ///
615     /// [`try_write()`]: UnixStream::try_write()
616     ///
617     /// # Return
618     ///
619     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
620     /// number of bytes written. If the stream is not ready to write data,
621     /// `Err(io::ErrorKind::WouldBlock)` is returned.
622     ///
623     /// # Examples
624     ///
625     /// ```no_run
626     /// use tokio::net::UnixStream;
627     /// use std::error::Error;
628     /// use std::io;
629     ///
630     /// #[tokio::main]
631     /// async fn main() -> Result<(), Box<dyn Error>> {
632     ///     // Connect to a peer
633     ///     let dir = tempfile::tempdir().unwrap();
634     ///     let bind_path = dir.path().join("bind_path");
635     ///     let stream = UnixStream::connect(bind_path).await?;
636     ///
637     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
638     ///
639     ///     loop {
640     ///         // Wait for the socket to be writable
641     ///         stream.writable().await?;
642     ///
643     ///         // Try to write data, this may still fail with `WouldBlock`
644     ///         // if the readiness event is a false positive.
645     ///         match stream.try_write_vectored(&bufs) {
646     ///             Ok(n) => {
647     ///                 break;
648     ///             }
649     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
650     ///                 continue;
651     ///             }
652     ///             Err(e) => {
653     ///                 return Err(e.into());
654     ///             }
655     ///         }
656     ///     }
657     ///
658     ///     Ok(())
659     /// }
660     /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>661     pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
662         self.io
663             .registration()
664             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
665     }
666 
667     /// Tries to read or write from the socket using a user-provided IO operation.
668     ///
669     /// If the socket is ready, the provided closure is called. The closure
670     /// should attempt to perform IO operation on the socket by manually
671     /// calling the appropriate syscall. If the operation fails because the
672     /// socket is not actually ready, then the closure should return a
673     /// `WouldBlock` error and the readiness flag is cleared. The return value
674     /// of the closure is then returned by `try_io`.
675     ///
676     /// If the socket is not ready, then the closure is not called
677     /// and a `WouldBlock` error is returned.
678     ///
679     /// The closure should only return a `WouldBlock` error if it has performed
680     /// an IO operation on the socket that failed due to the socket not being
681     /// ready. Returning a `WouldBlock` error in any other situation will
682     /// incorrectly clear the readiness flag, which can cause the socket to
683     /// behave incorrectly.
684     ///
685     /// The closure should not perform the IO operation using any of the methods
686     /// defined on the Tokio `UnixStream` type, as this will mess with the
687     /// readiness flag and can cause the socket to behave incorrectly.
688     ///
689     /// This method is not intended to be used with combined interests.
690     /// The closure should perform only one type of IO operation, so it should not
691     /// require more than one ready state. This method may panic or sleep forever
692     /// if it is called with a combined interest.
693     ///
694     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
695     ///
696     /// [`readable()`]: UnixStream::readable()
697     /// [`writable()`]: UnixStream::writable()
698     /// [`ready()`]: UnixStream::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>699     pub fn try_io<R>(
700         &self,
701         interest: Interest,
702         f: impl FnOnce() -> io::Result<R>,
703     ) -> io::Result<R> {
704         self.io
705             .registration()
706             .try_io(interest, || self.io.try_io(f))
707     }
708 
709     /// Reads or writes from the socket using a user-provided IO operation.
710     ///
711     /// The readiness of the socket is awaited and when the socket is ready,
712     /// the provided closure is called. The closure should attempt to perform
713     /// IO operation on the socket by manually calling the appropriate syscall.
714     /// If the operation fails because the socket is not actually ready,
715     /// then the closure should return a `WouldBlock` error. In such case the
716     /// readiness flag is cleared and the socket readiness is awaited again.
717     /// This loop is repeated until the closure returns an `Ok` or an error
718     /// other than `WouldBlock`.
719     ///
720     /// The closure should only return a `WouldBlock` error if it has performed
721     /// an IO operation on the socket that failed due to the socket not being
722     /// ready. Returning a `WouldBlock` error in any other situation will
723     /// incorrectly clear the readiness flag, which can cause the socket to
724     /// behave incorrectly.
725     ///
726     /// The closure should not perform the IO operation using any of the methods
727     /// defined on the Tokio `UnixStream` type, as this will mess with the
728     /// readiness flag and can cause the socket to behave incorrectly.
729     ///
730     /// This method is not intended to be used with combined interests.
731     /// The closure should perform only one type of IO operation, so it should not
732     /// require more than one ready state. This method may panic or sleep forever
733     /// if it is called with a combined interest.
async_io<R>( &self, interest: Interest, mut f: impl FnMut() -> io::Result<R>, ) -> io::Result<R>734     pub async fn async_io<R>(
735         &self,
736         interest: Interest,
737         mut f: impl FnMut() -> io::Result<R>,
738     ) -> io::Result<R> {
739         self.io
740             .registration()
741             .async_io(interest, || self.io.try_io(&mut f))
742             .await
743     }
744 
745     /// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`.
746     ///
747     /// This function is intended to be used to wrap a UnixStream from the
748     /// standard library in the Tokio equivalent.
749     ///
750     /// # Notes
751     ///
752     /// The caller is responsible for ensuring that the stream is in
753     /// non-blocking mode. Otherwise all I/O operations on the stream
754     /// will block the thread, which will cause unexpected behavior.
755     /// Non-blocking mode can be set using [`set_nonblocking`].
756     ///
757     /// [`set_nonblocking`]: std::os::unix::net::UnixStream::set_nonblocking
758     ///
759     /// # Examples
760     ///
761     /// ```no_run
762     /// use tokio::net::UnixStream;
763     /// use std::os::unix::net::UnixStream as StdUnixStream;
764     /// # use std::error::Error;
765     ///
766     /// # async fn dox() -> Result<(), Box<dyn Error>> {
767     /// let std_stream = StdUnixStream::connect("/path/to/the/socket")?;
768     /// std_stream.set_nonblocking(true)?;
769     /// let stream = UnixStream::from_std(std_stream)?;
770     /// # Ok(())
771     /// # }
772     /// ```
773     ///
774     /// # Panics
775     ///
776     /// This function panics if it is not called from within a runtime with
777     /// IO enabled.
778     ///
779     /// The runtime is usually set implicitly when this function is called
780     /// from a future driven by a tokio runtime, otherwise runtime can be set
781     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
782     #[track_caller]
from_std(stream: net::UnixStream) -> io::Result<UnixStream>783     pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> {
784         let stream = mio::net::UnixStream::from_std(stream);
785         let io = PollEvented::new(stream)?;
786 
787         Ok(UnixStream { io })
788     }
789 
790     /// Turns a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`].
791     ///
792     /// The returned [`std::os::unix::net::UnixStream`] will have nonblocking
793     /// mode set as `true`.  Use [`set_nonblocking`] to change the blocking
794     /// mode if needed.
795     ///
796     /// # Examples
797     ///
798     /// ```
799     /// use std::error::Error;
800     /// use std::io::Read;
801     /// use tokio::net::UnixListener;
802     /// # use tokio::net::UnixStream;
803     /// # use tokio::io::AsyncWriteExt;
804     ///
805     /// #[tokio::main]
806     /// async fn main() -> Result<(), Box<dyn Error>> {
807     ///     let dir = tempfile::tempdir().unwrap();
808     ///     let bind_path = dir.path().join("bind_path");
809     ///
810     ///     let mut data = [0u8; 12];
811     ///     let listener = UnixListener::bind(&bind_path)?;
812     /// #   let handle = tokio::spawn(async {
813     /// #       let mut stream = UnixStream::connect(bind_path).await.unwrap();
814     /// #       stream.write(b"Hello world!").await.unwrap();
815     /// #   });
816     ///     let (tokio_unix_stream, _) = listener.accept().await?;
817     ///     let mut std_unix_stream = tokio_unix_stream.into_std()?;
818     /// #   handle.await.expect("The task being joined has panicked");
819     ///     std_unix_stream.set_nonblocking(false)?;
820     ///     std_unix_stream.read_exact(&mut data)?;
821     /// #   assert_eq!(b"Hello world!", &data);
822     ///     Ok(())
823     /// }
824     /// ```
825     /// [`tokio::net::UnixStream`]: UnixStream
826     /// [`std::os::unix::net::UnixStream`]: std::os::unix::net::UnixStream
827     /// [`set_nonblocking`]: fn@std::os::unix::net::UnixStream::set_nonblocking
into_std(self) -> io::Result<std::os::unix::net::UnixStream>828     pub fn into_std(self) -> io::Result<std::os::unix::net::UnixStream> {
829         self.io
830             .into_inner()
831             .map(|io| io.into_raw_fd())
832             .map(|raw_fd| unsafe { std::os::unix::net::UnixStream::from_raw_fd(raw_fd) })
833     }
834 
835     /// Creates an unnamed pair of connected sockets.
836     ///
837     /// This function will create a pair of interconnected Unix sockets for
838     /// communicating back and forth between one another. Each socket will
839     /// be associated with the default event loop's handle.
pair() -> io::Result<(UnixStream, UnixStream)>840     pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
841         let (a, b) = mio::net::UnixStream::pair()?;
842         let a = UnixStream::new(a)?;
843         let b = UnixStream::new(b)?;
844 
845         Ok((a, b))
846     }
847 
new(stream: mio::net::UnixStream) -> io::Result<UnixStream>848     pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> {
849         let io = PollEvented::new(stream)?;
850         Ok(UnixStream { io })
851     }
852 
853     /// Returns the socket address of the local half of this connection.
854     ///
855     /// # Examples
856     ///
857     /// ```no_run
858     /// use tokio::net::UnixStream;
859     ///
860     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
861     /// let dir = tempfile::tempdir().unwrap();
862     /// let bind_path = dir.path().join("bind_path");
863     /// let stream = UnixStream::connect(bind_path).await?;
864     ///
865     /// println!("{:?}", stream.local_addr()?);
866     /// # Ok(())
867     /// # }
868     /// ```
local_addr(&self) -> io::Result<SocketAddr>869     pub fn local_addr(&self) -> io::Result<SocketAddr> {
870         self.io.local_addr().map(SocketAddr)
871     }
872 
873     /// Returns the socket address of the remote half of this connection.
874     ///
875     /// # Examples
876     ///
877     /// ```no_run
878     /// use tokio::net::UnixStream;
879     ///
880     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
881     /// let dir = tempfile::tempdir().unwrap();
882     /// let bind_path = dir.path().join("bind_path");
883     /// let stream = UnixStream::connect(bind_path).await?;
884     ///
885     /// println!("{:?}", stream.peer_addr()?);
886     /// # Ok(())
887     /// # }
888     /// ```
peer_addr(&self) -> io::Result<SocketAddr>889     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
890         self.io.peer_addr().map(SocketAddr)
891     }
892 
893     /// Returns effective credentials of the process which called `connect` or `pair`.
peer_cred(&self) -> io::Result<UCred>894     pub fn peer_cred(&self) -> io::Result<UCred> {
895         ucred::get_peer_cred(self)
896     }
897 
898     /// Returns the value of the `SO_ERROR` option.
take_error(&self) -> io::Result<Option<io::Error>>899     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
900         self.io.take_error()
901     }
902 
903     /// Shuts down the read, write, or both halves of this connection.
904     ///
905     /// This function will cause all pending and future I/O calls on the
906     /// specified portions to immediately return with an appropriate value
907     /// (see the documentation of `Shutdown`).
shutdown_std(&self, how: Shutdown) -> io::Result<()>908     pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
909         self.io.shutdown(how)
910     }
911 
912     // These lifetime markers also appear in the generated documentation, and make
913     // it more clear that this is a *borrowed* split.
914     #[allow(clippy::needless_lifetimes)]
915     /// Splits a `UnixStream` into a read half and a write half, which can be used
916     /// to read and write the stream concurrently.
917     ///
918     /// This method is more efficient than [`into_split`], but the halves cannot be
919     /// moved into independently spawned tasks.
920     ///
921     /// [`into_split`]: Self::into_split()
split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>)922     pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
923         split(self)
924     }
925 
926     /// Splits a `UnixStream` into a read half and a write half, which can be used
927     /// to read and write the stream concurrently.
928     ///
929     /// Unlike [`split`], the owned halves can be moved to separate tasks, however
930     /// this comes at the cost of a heap allocation.
931     ///
932     /// **Note:** Dropping the write half will shut down the write half of the
933     /// stream. This is equivalent to calling [`shutdown()`] on the `UnixStream`.
934     ///
935     /// [`split`]: Self::split()
936     /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
into_split(self) -> (OwnedReadHalf, OwnedWriteHalf)937     pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
938         split_owned(self)
939     }
940 }
941 
942 impl TryFrom<net::UnixStream> for UnixStream {
943     type Error = io::Error;
944 
945     /// Consumes stream, returning the tokio I/O object.
946     ///
947     /// This is equivalent to
948     /// [`UnixStream::from_std(stream)`](UnixStream::from_std).
try_from(stream: net::UnixStream) -> io::Result<Self>949     fn try_from(stream: net::UnixStream) -> io::Result<Self> {
950         Self::from_std(stream)
951     }
952 }
953 
954 impl AsyncRead for UnixStream {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>955     fn poll_read(
956         self: Pin<&mut Self>,
957         cx: &mut Context<'_>,
958         buf: &mut ReadBuf<'_>,
959     ) -> Poll<io::Result<()>> {
960         self.poll_read_priv(cx, buf)
961     }
962 }
963 
964 impl AsyncWrite for UnixStream {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>965     fn poll_write(
966         self: Pin<&mut Self>,
967         cx: &mut Context<'_>,
968         buf: &[u8],
969     ) -> Poll<io::Result<usize>> {
970         self.poll_write_priv(cx, buf)
971     }
972 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>973     fn poll_write_vectored(
974         self: Pin<&mut Self>,
975         cx: &mut Context<'_>,
976         bufs: &[io::IoSlice<'_>],
977     ) -> Poll<io::Result<usize>> {
978         self.poll_write_vectored_priv(cx, bufs)
979     }
980 
is_write_vectored(&self) -> bool981     fn is_write_vectored(&self) -> bool {
982         true
983     }
984 
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>985     fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
986         Poll::Ready(Ok(()))
987     }
988 
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>989     fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
990         self.shutdown_std(std::net::Shutdown::Write)?;
991         Poll::Ready(Ok(()))
992     }
993 }
994 
995 impl UnixStream {
996     // == Poll IO functions that takes `&self` ==
997     //
998     // To read or write without mutable access to the `UnixStream`, combine the
999     // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1000     // `try_write` methods.
1001 
poll_read_priv( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1002     pub(crate) fn poll_read_priv(
1003         &self,
1004         cx: &mut Context<'_>,
1005         buf: &mut ReadBuf<'_>,
1006     ) -> Poll<io::Result<()>> {
1007         // Safety: `UnixStream::read` correctly handles reads into uninitialized memory
1008         unsafe { self.io.poll_read(cx, buf) }
1009     }
1010 
poll_write_priv( &self, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1011     pub(crate) fn poll_write_priv(
1012         &self,
1013         cx: &mut Context<'_>,
1014         buf: &[u8],
1015     ) -> Poll<io::Result<usize>> {
1016         self.io.poll_write(cx, buf)
1017     }
1018 
poll_write_vectored_priv( &self, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1019     pub(super) fn poll_write_vectored_priv(
1020         &self,
1021         cx: &mut Context<'_>,
1022         bufs: &[io::IoSlice<'_>],
1023     ) -> Poll<io::Result<usize>> {
1024         self.io.poll_write_vectored(cx, bufs)
1025     }
1026 }
1027 
1028 impl fmt::Debug for UnixStream {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1029     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1030         self.io.fmt(f)
1031     }
1032 }
1033 
1034 impl AsRawFd for UnixStream {
as_raw_fd(&self) -> RawFd1035     fn as_raw_fd(&self) -> RawFd {
1036         self.io.as_raw_fd()
1037     }
1038 }
1039 
1040 impl AsFd for UnixStream {
as_fd(&self) -> BorrowedFd<'_>1041     fn as_fd(&self) -> BorrowedFd<'_> {
1042         unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1043     }
1044 }
1045