1 use crate::future::poll_fn; 2 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; 3 use crate::net::unix::split::{split, ReadHalf, WriteHalf}; 4 use crate::net::unix::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; 5 use crate::net::unix::ucred::{self, UCred}; 6 use crate::net::unix::SocketAddr; 7 8 use std::convert::TryFrom; 9 use std::fmt; 10 use std::io::{self, Read, Write}; 11 use std::net::Shutdown; 12 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; 13 use std::os::unix::net; 14 use std::path::Path; 15 use std::pin::Pin; 16 use std::task::{Context, Poll}; 17 18 cfg_io_util! { 19 use bytes::BufMut; 20 } 21 22 cfg_net_unix! { 23 /// A structure representing a connected Unix socket. 24 /// 25 /// This socket can be connected directly with `UnixStream::connect` or accepted 26 /// from a listener with `UnixListener::incoming`. Additionally, a pair of 27 /// anonymous Unix sockets can be created with `UnixStream::pair`. 28 /// 29 /// To shut down the stream in the write direction, you can call the 30 /// [`shutdown()`] method. This will cause the other peer to receive a read of 31 /// length 0, indicating that no more data will be sent. This only closes 32 /// the stream in one direction. 33 /// 34 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown 35 pub struct UnixStream { 36 io: PollEvented<mio::net::UnixStream>, 37 } 38 } 39 40 impl UnixStream { 41 /// Connects to the socket named by `path`. 42 /// 43 /// This function will create a new Unix socket and connect to the path 44 /// specified, associating the returned stream with the default event loop's 45 /// handle. connect<P>(path: P) -> io::Result<UnixStream> where P: AsRef<Path>,46 pub async fn connect<P>(path: P) -> io::Result<UnixStream> 47 where 48 P: AsRef<Path>, 49 { 50 let stream = mio::net::UnixStream::connect(path)?; 51 let stream = UnixStream::new(stream)?; 52 53 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; 54 Ok(stream) 55 } 56 57 /// Wait for any of the requested ready states. 58 /// 59 /// This function is usually paired with `try_read()` or `try_write()`. It 60 /// can be used to concurrently read / write to the same socket on a single 61 /// task without splitting the socket. 62 /// 63 /// # Examples 64 /// 65 /// Concurrently read and write to the stream on the same task without 66 /// splitting. 67 /// 68 /// ```no_run 69 /// use tokio::io::Interest; 70 /// use tokio::net::UnixStream; 71 /// use std::error::Error; 72 /// use std::io; 73 /// 74 /// #[tokio::main] 75 /// async fn main() -> Result<(), Box<dyn Error>> { 76 /// let dir = tempfile::tempdir().unwrap(); 77 /// let bind_path = dir.path().join("bind_path"); 78 /// let stream = UnixStream::connect(bind_path).await?; 79 /// 80 /// loop { 81 /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?; 82 /// 83 /// if ready.is_readable() { 84 /// let mut data = vec![0; 1024]; 85 /// // Try to read data, this may still fail with `WouldBlock` 86 /// // if the readiness event is a false positive. 87 /// match stream.try_read(&mut data) { 88 /// Ok(n) => { 89 /// println!("read {} bytes", n); 90 /// } 91 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 92 /// continue; 93 /// } 94 /// Err(e) => { 95 /// return Err(e.into()); 96 /// } 97 /// } 98 /// 99 /// } 100 /// 101 /// if ready.is_writable() { 102 /// // Try to write data, this may still fail with `WouldBlock` 103 /// // if the readiness event is a false positive. 104 /// match stream.try_write(b"hello world") { 105 /// Ok(n) => { 106 /// println!("write {} bytes", n); 107 /// } 108 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 109 /// continue; 110 /// } 111 /// Err(e) => { 112 /// return Err(e.into()); 113 /// } 114 /// } 115 /// } 116 /// } 117 /// } 118 /// ``` ready(&self, interest: Interest) -> io::Result<Ready>119 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { 120 let event = self.io.registration().readiness(interest).await?; 121 Ok(event.ready) 122 } 123 124 /// Wait for the socket to become readable. 125 /// 126 /// This function is equivalent to `ready(Interest::READABLE)` and is usually 127 /// paired with `try_read()`. 128 /// 129 /// # Examples 130 /// 131 /// ```no_run 132 /// use tokio::net::UnixStream; 133 /// use std::error::Error; 134 /// use std::io; 135 /// 136 /// #[tokio::main] 137 /// async fn main() -> Result<(), Box<dyn Error>> { 138 /// // Connect to a peer 139 /// let dir = tempfile::tempdir().unwrap(); 140 /// let bind_path = dir.path().join("bind_path"); 141 /// let stream = UnixStream::connect(bind_path).await?; 142 /// 143 /// let mut msg = vec![0; 1024]; 144 /// 145 /// loop { 146 /// // Wait for the socket to be readable 147 /// stream.readable().await?; 148 /// 149 /// // Try to read data, this may still fail with `WouldBlock` 150 /// // if the readiness event is a false positive. 151 /// match stream.try_read(&mut msg) { 152 /// Ok(n) => { 153 /// msg.truncate(n); 154 /// break; 155 /// } 156 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 157 /// continue; 158 /// } 159 /// Err(e) => { 160 /// return Err(e.into()); 161 /// } 162 /// } 163 /// } 164 /// 165 /// println!("GOT = {:?}", msg); 166 /// Ok(()) 167 /// } 168 /// ``` readable(&self) -> io::Result<()>169 pub async fn readable(&self) -> io::Result<()> { 170 self.ready(Interest::READABLE).await?; 171 Ok(()) 172 } 173 174 /// Polls for read readiness. 175 /// 176 /// If the unix stream is not currently ready for reading, this method will 177 /// store a clone of the `Waker` from the provided `Context`. When the unix 178 /// stream becomes ready for reading, `Waker::wake` will be called on the 179 /// waker. 180 /// 181 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only 182 /// the `Waker` from the `Context` passed to the most recent call is 183 /// scheduled to receive a wakeup. (However, `poll_write_ready` retains a 184 /// second, independent waker.) 185 /// 186 /// This function is intended for cases where creating and pinning a future 187 /// via [`readable`] is not feasible. Where possible, using [`readable`] is 188 /// preferred, as this supports polling from multiple tasks at once. 189 /// 190 /// # Return value 191 /// 192 /// The function returns: 193 /// 194 /// * `Poll::Pending` if the unix stream is not ready for reading. 195 /// * `Poll::Ready(Ok(()))` if the unix stream is ready for reading. 196 /// * `Poll::Ready(Err(e))` if an error is encountered. 197 /// 198 /// # Errors 199 /// 200 /// This function may encounter any standard I/O error except `WouldBlock`. 201 /// 202 /// [`readable`]: method@Self::readable poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>203 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 204 self.io.registration().poll_read_ready(cx).map_ok(|_| ()) 205 } 206 207 /// Try to read data from the stream into the provided buffer, returning how 208 /// many bytes were read. 209 /// 210 /// Receives any pending data from the socket but does not wait for new data 211 /// to arrive. On success, returns the number of bytes read. Because 212 /// `try_read()` is non-blocking, the buffer does not have to be stored by 213 /// the async task and can exist entirely on the stack. 214 /// 215 /// Usually, [`readable()`] or [`ready()`] is used with this function. 216 /// 217 /// [`readable()`]: UnixStream::readable() 218 /// [`ready()`]: UnixStream::ready() 219 /// 220 /// # Return 221 /// 222 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 223 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 224 /// and will no longer yield data. If the stream is not ready to read data 225 /// `Err(io::ErrorKind::WouldBlock)` is returned. 226 /// 227 /// # Examples 228 /// 229 /// ```no_run 230 /// use tokio::net::UnixStream; 231 /// use std::error::Error; 232 /// use std::io; 233 /// 234 /// #[tokio::main] 235 /// async fn main() -> Result<(), Box<dyn Error>> { 236 /// // Connect to a peer 237 /// let dir = tempfile::tempdir().unwrap(); 238 /// let bind_path = dir.path().join("bind_path"); 239 /// let stream = UnixStream::connect(bind_path).await?; 240 /// 241 /// loop { 242 /// // Wait for the socket to be readable 243 /// stream.readable().await?; 244 /// 245 /// // Creating the buffer **after** the `await` prevents it from 246 /// // being stored in the async task. 247 /// let mut buf = [0; 4096]; 248 /// 249 /// // Try to read data, this may still fail with `WouldBlock` 250 /// // if the readiness event is a false positive. 251 /// match stream.try_read(&mut buf) { 252 /// Ok(0) => break, 253 /// Ok(n) => { 254 /// println!("read {} bytes", n); 255 /// } 256 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 257 /// continue; 258 /// } 259 /// Err(e) => { 260 /// return Err(e.into()); 261 /// } 262 /// } 263 /// } 264 /// 265 /// Ok(()) 266 /// } 267 /// ``` try_read(&self, buf: &mut [u8]) -> io::Result<usize>268 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { 269 self.io 270 .registration() 271 .try_io(Interest::READABLE, || (&*self.io).read(buf)) 272 } 273 274 cfg_io_util! { 275 /// Try to read data from the stream into the provided buffer, advancing the 276 /// buffer's internal cursor, returning how many bytes were read. 277 /// 278 /// Receives any pending data from the socket but does not wait for new data 279 /// to arrive. On success, returns the number of bytes read. Because 280 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by 281 /// the async task and can exist entirely on the stack. 282 /// 283 /// Usually, [`readable()`] or [`ready()`] is used with this function. 284 /// 285 /// [`readable()`]: UnixStream::readable() 286 /// [`ready()`]: UnixStream::ready() 287 /// 288 /// # Return 289 /// 290 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 291 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 292 /// and will no longer yield data. If the stream is not ready to read data 293 /// `Err(io::ErrorKind::WouldBlock)` is returned. 294 /// 295 /// # Examples 296 /// 297 /// ```no_run 298 /// use tokio::net::UnixStream; 299 /// use std::error::Error; 300 /// use std::io; 301 /// 302 /// #[tokio::main] 303 /// async fn main() -> Result<(), Box<dyn Error>> { 304 /// // Connect to a peer 305 /// let dir = tempfile::tempdir().unwrap(); 306 /// let bind_path = dir.path().join("bind_path"); 307 /// let stream = UnixStream::connect(bind_path).await?; 308 /// 309 /// loop { 310 /// // Wait for the socket to be readable 311 /// stream.readable().await?; 312 /// 313 /// let mut buf = Vec::with_capacity(4096); 314 /// 315 /// // Try to read data, this may still fail with `WouldBlock` 316 /// // if the readiness event is a false positive. 317 /// match stream.try_read_buf(&mut buf) { 318 /// Ok(0) => break, 319 /// Ok(n) => { 320 /// println!("read {} bytes", n); 321 /// } 322 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 323 /// continue; 324 /// } 325 /// Err(e) => { 326 /// return Err(e.into()); 327 /// } 328 /// } 329 /// } 330 /// 331 /// Ok(()) 332 /// } 333 /// ``` 334 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { 335 self.io.registration().try_io(Interest::READABLE, || { 336 use std::io::Read; 337 338 let dst = buf.chunk_mut(); 339 let dst = 340 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; 341 342 // Safety: We trust `UnixStream::read` to have filled up `n` bytes in the 343 // buffer. 344 let n = (&*self.io).read(dst)?; 345 346 unsafe { 347 buf.advance_mut(n); 348 } 349 350 Ok(n) 351 }) 352 } 353 } 354 355 /// Wait for the socket to become writable. 356 /// 357 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually 358 /// paired with `try_write()`. 359 /// 360 /// # Examples 361 /// 362 /// ```no_run 363 /// use tokio::net::UnixStream; 364 /// use std::error::Error; 365 /// use std::io; 366 /// 367 /// #[tokio::main] 368 /// async fn main() -> Result<(), Box<dyn Error>> { 369 /// // Connect to a peer 370 /// let dir = tempfile::tempdir().unwrap(); 371 /// let bind_path = dir.path().join("bind_path"); 372 /// let stream = UnixStream::connect(bind_path).await?; 373 /// 374 /// loop { 375 /// // Wait for the socket to be writable 376 /// stream.writable().await?; 377 /// 378 /// // Try to write data, this may still fail with `WouldBlock` 379 /// // if the readiness event is a false positive. 380 /// match stream.try_write(b"hello world") { 381 /// Ok(n) => { 382 /// break; 383 /// } 384 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 385 /// continue; 386 /// } 387 /// Err(e) => { 388 /// return Err(e.into()); 389 /// } 390 /// } 391 /// } 392 /// 393 /// Ok(()) 394 /// } 395 /// ``` writable(&self) -> io::Result<()>396 pub async fn writable(&self) -> io::Result<()> { 397 self.ready(Interest::WRITABLE).await?; 398 Ok(()) 399 } 400 401 /// Polls for write readiness. 402 /// 403 /// If the unix stream is not currently ready for writing, this method will 404 /// store a clone of the `Waker` from the provided `Context`. When the unix 405 /// stream becomes ready for writing, `Waker::wake` will be called on the 406 /// waker. 407 /// 408 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only 409 /// the `Waker` from the `Context` passed to the most recent call is 410 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a 411 /// second, independent waker.) 412 /// 413 /// This function is intended for cases where creating and pinning a future 414 /// via [`writable`] is not feasible. Where possible, using [`writable`] is 415 /// preferred, as this supports polling from multiple tasks at once. 416 /// 417 /// # Return value 418 /// 419 /// The function returns: 420 /// 421 /// * `Poll::Pending` if the unix stream is not ready for writing. 422 /// * `Poll::Ready(Ok(()))` if the unix stream is ready for writing. 423 /// * `Poll::Ready(Err(e))` if an error is encountered. 424 /// 425 /// # Errors 426 /// 427 /// This function may encounter any standard I/O error except `WouldBlock`. 428 /// 429 /// [`writable`]: method@Self::writable poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>430 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 431 self.io.registration().poll_write_ready(cx).map_ok(|_| ()) 432 } 433 434 /// Try to write a buffer to the stream, returning how many bytes were 435 /// written. 436 /// 437 /// The function will attempt to write the entire contents of `buf`, but 438 /// only part of the buffer may be written. 439 /// 440 /// This function is usually paired with `writable()`. 441 /// 442 /// # Return 443 /// 444 /// If data is successfully written, `Ok(n)` is returned, where `n` is the 445 /// number of bytes written. If the stream is not ready to write data, 446 /// `Err(io::ErrorKind::WouldBlock)` is returned. 447 /// 448 /// # Examples 449 /// 450 /// ```no_run 451 /// use tokio::net::UnixStream; 452 /// use std::error::Error; 453 /// use std::io; 454 /// 455 /// #[tokio::main] 456 /// async fn main() -> Result<(), Box<dyn Error>> { 457 /// // Connect to a peer 458 /// let dir = tempfile::tempdir().unwrap(); 459 /// let bind_path = dir.path().join("bind_path"); 460 /// let stream = UnixStream::connect(bind_path).await?; 461 /// 462 /// loop { 463 /// // Wait for the socket to be writable 464 /// stream.writable().await?; 465 /// 466 /// // Try to write data, this may still fail with `WouldBlock` 467 /// // if the readiness event is a false positive. 468 /// match stream.try_write(b"hello world") { 469 /// Ok(n) => { 470 /// break; 471 /// } 472 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 473 /// continue; 474 /// } 475 /// Err(e) => { 476 /// return Err(e.into()); 477 /// } 478 /// } 479 /// } 480 /// 481 /// Ok(()) 482 /// } 483 /// ``` try_write(&self, buf: &[u8]) -> io::Result<usize>484 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { 485 self.io 486 .registration() 487 .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) 488 } 489 490 /// Creates new `UnixStream` from a `std::os::unix::net::UnixStream`. 491 /// 492 /// This function is intended to be used to wrap a UnixStream from the 493 /// standard library in the Tokio equivalent. The conversion assumes 494 /// nothing about the underlying stream; it is left up to the user to set 495 /// it in non-blocking mode. 496 /// 497 /// # Panics 498 /// 499 /// This function panics if thread-local runtime is not set. 500 /// 501 /// The runtime is usually set implicitly when this function is called 502 /// from a future driven by a tokio runtime, otherwise runtime can be set 503 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. from_std(stream: net::UnixStream) -> io::Result<UnixStream>504 pub fn from_std(stream: net::UnixStream) -> io::Result<UnixStream> { 505 let stream = mio::net::UnixStream::from_std(stream); 506 let io = PollEvented::new(stream)?; 507 508 Ok(UnixStream { io }) 509 } 510 511 /// Turn a [`tokio::net::UnixStream`] into a [`std::os::unix::net::UnixStream`]. 512 /// 513 /// The returned [`std::os::unix::net::UnixStream`] will have nonblocking 514 /// mode set as `true`. Use [`set_nonblocking`] to change the blocking 515 /// mode if needed. 516 /// 517 /// # Examples 518 /// 519 /// ``` 520 /// use std::error::Error; 521 /// use std::io::Read; 522 /// use tokio::net::UnixListener; 523 /// # use tokio::net::UnixStream; 524 /// # use tokio::io::AsyncWriteExt; 525 /// 526 /// #[tokio::main] 527 /// async fn main() -> Result<(), Box<dyn Error>> { 528 /// let dir = tempfile::tempdir().unwrap(); 529 /// let bind_path = dir.path().join("bind_path"); 530 /// 531 /// let mut data = [0u8; 12]; 532 /// let listener = UnixListener::bind(&bind_path)?; 533 /// # let handle = tokio::spawn(async { 534 /// # let mut stream = UnixStream::connect(bind_path).await.unwrap(); 535 /// # stream.write(b"Hello world!").await.unwrap(); 536 /// # }); 537 /// let (tokio_unix_stream, _) = listener.accept().await?; 538 /// let mut std_unix_stream = tokio_unix_stream.into_std()?; 539 /// # handle.await.expect("The task being joined has panicked"); 540 /// std_unix_stream.set_nonblocking(false)?; 541 /// std_unix_stream.read_exact(&mut data)?; 542 /// # assert_eq!(b"Hello world!", &data); 543 /// Ok(()) 544 /// } 545 /// ``` 546 /// [`tokio::net::UnixStream`]: UnixStream 547 /// [`std::os::unix::net::UnixStream`]: std::os::unix::net::UnixStream 548 /// [`set_nonblocking`]: fn@std::os::unix::net::UnixStream::set_nonblocking into_std(self) -> io::Result<std::os::unix::net::UnixStream>549 pub fn into_std(self) -> io::Result<std::os::unix::net::UnixStream> { 550 self.io 551 .into_inner() 552 .map(|io| io.into_raw_fd()) 553 .map(|raw_fd| unsafe { std::os::unix::net::UnixStream::from_raw_fd(raw_fd) }) 554 } 555 556 /// Creates an unnamed pair of connected sockets. 557 /// 558 /// This function will create a pair of interconnected Unix sockets for 559 /// communicating back and forth between one another. Each socket will 560 /// be associated with the default event loop's handle. pair() -> io::Result<(UnixStream, UnixStream)>561 pub fn pair() -> io::Result<(UnixStream, UnixStream)> { 562 let (a, b) = mio::net::UnixStream::pair()?; 563 let a = UnixStream::new(a)?; 564 let b = UnixStream::new(b)?; 565 566 Ok((a, b)) 567 } 568 new(stream: mio::net::UnixStream) -> io::Result<UnixStream>569 pub(crate) fn new(stream: mio::net::UnixStream) -> io::Result<UnixStream> { 570 let io = PollEvented::new(stream)?; 571 Ok(UnixStream { io }) 572 } 573 574 /// Returns the socket address of the local half of this connection. local_addr(&self) -> io::Result<SocketAddr>575 pub fn local_addr(&self) -> io::Result<SocketAddr> { 576 self.io.local_addr().map(SocketAddr) 577 } 578 579 /// Returns the socket address of the remote half of this connection. peer_addr(&self) -> io::Result<SocketAddr>580 pub fn peer_addr(&self) -> io::Result<SocketAddr> { 581 self.io.peer_addr().map(SocketAddr) 582 } 583 584 /// Returns effective credentials of the process which called `connect` or `pair`. peer_cred(&self) -> io::Result<UCred>585 pub fn peer_cred(&self) -> io::Result<UCred> { 586 ucred::get_peer_cred(self) 587 } 588 589 /// Returns the value of the `SO_ERROR` option. take_error(&self) -> io::Result<Option<io::Error>>590 pub fn take_error(&self) -> io::Result<Option<io::Error>> { 591 self.io.take_error() 592 } 593 594 /// Shuts down the read, write, or both halves of this connection. 595 /// 596 /// This function will cause all pending and future I/O calls on the 597 /// specified portions to immediately return with an appropriate value 598 /// (see the documentation of `Shutdown`). shutdown_std(&self, how: Shutdown) -> io::Result<()>599 pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> { 600 self.io.shutdown(how) 601 } 602 603 // These lifetime markers also appear in the generated documentation, and make 604 // it more clear that this is a *borrowed* split. 605 #[allow(clippy::needless_lifetimes)] 606 /// Split a `UnixStream` into a read half and a write half, which can be used 607 /// to read and write the stream concurrently. 608 /// 609 /// This method is more efficient than [`into_split`], but the halves cannot be 610 /// moved into independently spawned tasks. 611 /// 612 /// [`into_split`]: Self::into_split() split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>)613 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) { 614 split(self) 615 } 616 617 /// Splits a `UnixStream` into a read half and a write half, which can be used 618 /// to read and write the stream concurrently. 619 /// 620 /// Unlike [`split`], the owned halves can be moved to separate tasks, however 621 /// this comes at the cost of a heap allocation. 622 /// 623 /// **Note:** Dropping the write half will shut down the write half of the 624 /// stream. This is equivalent to calling [`shutdown()`] on the `UnixStream`. 625 /// 626 /// [`split`]: Self::split() 627 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown into_split(self) -> (OwnedReadHalf, OwnedWriteHalf)628 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) { 629 split_owned(self) 630 } 631 } 632 633 impl TryFrom<net::UnixStream> for UnixStream { 634 type Error = io::Error; 635 636 /// Consumes stream, returning the tokio I/O object. 637 /// 638 /// This is equivalent to 639 /// [`UnixStream::from_std(stream)`](UnixStream::from_std). try_from(stream: net::UnixStream) -> io::Result<Self>640 fn try_from(stream: net::UnixStream) -> io::Result<Self> { 641 Self::from_std(stream) 642 } 643 } 644 645 impl AsyncRead for UnixStream { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>646 fn poll_read( 647 self: Pin<&mut Self>, 648 cx: &mut Context<'_>, 649 buf: &mut ReadBuf<'_>, 650 ) -> Poll<io::Result<()>> { 651 self.poll_read_priv(cx, buf) 652 } 653 } 654 655 impl AsyncWrite for UnixStream { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>656 fn poll_write( 657 self: Pin<&mut Self>, 658 cx: &mut Context<'_>, 659 buf: &[u8], 660 ) -> Poll<io::Result<usize>> { 661 self.poll_write_priv(cx, buf) 662 } 663 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>664 fn poll_write_vectored( 665 self: Pin<&mut Self>, 666 cx: &mut Context<'_>, 667 bufs: &[io::IoSlice<'_>], 668 ) -> Poll<io::Result<usize>> { 669 self.poll_write_vectored_priv(cx, bufs) 670 } 671 is_write_vectored(&self) -> bool672 fn is_write_vectored(&self) -> bool { 673 true 674 } 675 poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>676 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 677 Poll::Ready(Ok(())) 678 } 679 poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>680 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 681 self.shutdown_std(std::net::Shutdown::Write)?; 682 Poll::Ready(Ok(())) 683 } 684 } 685 686 impl UnixStream { 687 // == Poll IO functions that takes `&self` == 688 // 689 // They are not public because (taken from the doc of `PollEvented`): 690 // 691 // While `PollEvented` is `Sync` (if the underlying I/O type is `Sync`), the 692 // caller must ensure that there are at most two tasks that use a 693 // `PollEvented` instance concurrently. One for reading and one for writing. 694 // While violating this requirement is "safe" from a Rust memory model point 695 // of view, it will result in unexpected behavior in the form of lost 696 // notifications and tasks hanging. 697 poll_read_priv( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>698 pub(crate) fn poll_read_priv( 699 &self, 700 cx: &mut Context<'_>, 701 buf: &mut ReadBuf<'_>, 702 ) -> Poll<io::Result<()>> { 703 // Safety: `UnixStream::read` correctly handles reads into uninitialized memory 704 unsafe { self.io.poll_read(cx, buf) } 705 } 706 poll_write_priv( &self, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>707 pub(crate) fn poll_write_priv( 708 &self, 709 cx: &mut Context<'_>, 710 buf: &[u8], 711 ) -> Poll<io::Result<usize>> { 712 self.io.poll_write(cx, buf) 713 } 714 poll_write_vectored_priv( &self, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>715 pub(super) fn poll_write_vectored_priv( 716 &self, 717 cx: &mut Context<'_>, 718 bufs: &[io::IoSlice<'_>], 719 ) -> Poll<io::Result<usize>> { 720 self.io.poll_write_vectored(cx, bufs) 721 } 722 } 723 724 impl fmt::Debug for UnixStream { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result725 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 726 self.io.fmt(f) 727 } 728 } 729 730 impl AsRawFd for UnixStream { as_raw_fd(&self) -> RawFd731 fn as_raw_fd(&self) -> RawFd { 732 self.io.as_raw_fd() 733 } 734 } 735