1 cfg_not_wasi! { 2 use crate::future::poll_fn; 3 use crate::net::{to_socket_addrs, ToSocketAddrs}; 4 use std::time::Duration; 5 } 6 7 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; 8 use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; 9 use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; 10 11 use std::fmt; 12 use std::io; 13 use std::net::{Shutdown, SocketAddr}; 14 use std::pin::Pin; 15 use std::task::{Context, Poll}; 16 17 cfg_io_util! { 18 use bytes::BufMut; 19 } 20 21 cfg_net! { 22 /// A TCP stream between a local and a remote socket. 23 /// 24 /// A TCP stream can either be created by connecting to an endpoint, via the 25 /// [`connect`] method, or by [accepting] a connection from a [listener]. A 26 /// TCP stream can also be created via the [`TcpSocket`] type. 27 /// 28 /// Reading and writing to a `TcpStream` is usually done using the 29 /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] 30 /// traits. 31 /// 32 /// [`connect`]: method@TcpStream::connect 33 /// [accepting]: method@crate::net::TcpListener::accept 34 /// [listener]: struct@crate::net::TcpListener 35 /// [`TcpSocket`]: struct@crate::net::TcpSocket 36 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt 37 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt 38 /// 39 /// # Examples 40 /// 41 /// ```no_run 42 /// use tokio::net::TcpStream; 43 /// use tokio::io::AsyncWriteExt; 44 /// use std::error::Error; 45 /// 46 /// #[tokio::main] 47 /// async fn main() -> Result<(), Box<dyn Error>> { 48 /// // Connect to a peer 49 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; 50 /// 51 /// // Write some data. 52 /// stream.write_all(b"hello world!").await?; 53 /// 54 /// Ok(()) 55 /// } 56 /// ``` 57 /// 58 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. 59 /// 60 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all 61 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt 62 /// 63 /// To shut down the stream in the write direction, you can call the 64 /// [`shutdown()`] method. This will cause the other peer to receive a read of 65 /// length 0, indicating that no more data will be sent. This only closes 66 /// the stream in one direction. 67 /// 68 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown 69 pub struct TcpStream { 70 io: PollEvented<mio::net::TcpStream>, 71 } 72 } 73 74 impl TcpStream { 75 cfg_not_wasi! { 76 /// Opens a TCP connection to a remote host. 77 /// 78 /// `addr` is an address of the remote host. Anything which implements the 79 /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr` 80 /// yields multiple addresses, connect will be attempted with each of the 81 /// addresses until a connection is successful. If none of the addresses 82 /// result in a successful connection, the error returned from the last 83 /// connection attempt (the last address) is returned. 84 /// 85 /// To configure the socket before connecting, you can use the [`TcpSocket`] 86 /// type. 87 /// 88 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs 89 /// [`TcpSocket`]: struct@crate::net::TcpSocket 90 /// 91 /// # Examples 92 /// 93 /// ```no_run 94 /// use tokio::net::TcpStream; 95 /// use tokio::io::AsyncWriteExt; 96 /// use std::error::Error; 97 /// 98 /// #[tokio::main] 99 /// async fn main() -> Result<(), Box<dyn Error>> { 100 /// // Connect to a peer 101 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; 102 /// 103 /// // Write some data. 104 /// stream.write_all(b"hello world!").await?; 105 /// 106 /// Ok(()) 107 /// } 108 /// ``` 109 /// 110 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. 111 /// 112 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all 113 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt 114 pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> { 115 let addrs = to_socket_addrs(addr).await?; 116 117 let mut last_err = None; 118 119 for addr in addrs { 120 match TcpStream::connect_addr(addr).await { 121 Ok(stream) => return Ok(stream), 122 Err(e) => last_err = Some(e), 123 } 124 } 125 126 Err(last_err.unwrap_or_else(|| { 127 io::Error::new( 128 io::ErrorKind::InvalidInput, 129 "could not resolve to any address", 130 ) 131 })) 132 } 133 134 /// Establishes a connection to the specified `addr`. 135 async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> { 136 let sys = mio::net::TcpStream::connect(addr)?; 137 TcpStream::connect_mio(sys).await 138 } 139 140 pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> { 141 let stream = TcpStream::new(sys)?; 142 143 // Once we've connected, wait for the stream to be writable as 144 // that's when the actual connection has been initiated. Once we're 145 // writable we check for `take_socket_error` to see if the connect 146 // actually hit an error or not. 147 // 148 // If all that succeeded then we ship everything on up. 149 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; 150 151 if let Some(e) = stream.io.take_error()? { 152 return Err(e); 153 } 154 155 Ok(stream) 156 } 157 } 158 new(connected: mio::net::TcpStream) -> io::Result<TcpStream>159 pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> { 160 let io = PollEvented::new(connected)?; 161 Ok(TcpStream { io }) 162 } 163 164 /// Creates new `TcpStream` from a `std::net::TcpStream`. 165 /// 166 /// This function is intended to be used to wrap a TCP stream from the 167 /// standard library in the Tokio equivalent. 168 /// 169 /// # Notes 170 /// 171 /// The caller is responsible for ensuring that the stream is in 172 /// non-blocking mode. Otherwise all I/O operations on the stream 173 /// will block the thread, which will cause unexpected behavior. 174 /// Non-blocking mode can be set using [`set_nonblocking`]. 175 /// 176 /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking 177 /// 178 /// # Examples 179 /// 180 /// ```rust,no_run 181 /// use std::error::Error; 182 /// use tokio::net::TcpStream; 183 /// 184 /// #[tokio::main] 185 /// async fn main() -> Result<(), Box<dyn Error>> { 186 /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?; 187 /// std_stream.set_nonblocking(true)?; 188 /// let stream = TcpStream::from_std(std_stream)?; 189 /// Ok(()) 190 /// } 191 /// ``` 192 /// 193 /// # Panics 194 /// 195 /// This function panics if it is not called from within a runtime with 196 /// IO enabled. 197 /// 198 /// The runtime is usually set implicitly when this function is called 199 /// from a future driven by a tokio runtime, otherwise runtime can be set 200 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. 201 #[track_caller] from_std(stream: std::net::TcpStream) -> io::Result<TcpStream>202 pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> { 203 let io = mio::net::TcpStream::from_std(stream); 204 let io = PollEvented::new(io)?; 205 Ok(TcpStream { io }) 206 } 207 208 /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`]. 209 /// 210 /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`. 211 /// Use [`set_nonblocking`] to change the blocking mode if needed. 212 /// 213 /// # Examples 214 /// 215 /// ``` 216 /// use std::error::Error; 217 /// use std::io::Read; 218 /// use tokio::net::TcpListener; 219 /// # use tokio::net::TcpStream; 220 /// # use tokio::io::AsyncWriteExt; 221 /// 222 /// #[tokio::main] 223 /// async fn main() -> Result<(), Box<dyn Error>> { 224 /// let mut data = [0u8; 12]; 225 /// let listener = TcpListener::bind("127.0.0.1:34254").await?; 226 /// # let handle = tokio::spawn(async { 227 /// # let mut stream: TcpStream = TcpStream::connect("127.0.0.1:34254").await.unwrap(); 228 /// # stream.write(b"Hello world!").await.unwrap(); 229 /// # }); 230 /// let (tokio_tcp_stream, _) = listener.accept().await?; 231 /// let mut std_tcp_stream = tokio_tcp_stream.into_std()?; 232 /// # handle.await.expect("The task being joined has panicked"); 233 /// std_tcp_stream.set_nonblocking(false)?; 234 /// std_tcp_stream.read_exact(&mut data)?; 235 /// # assert_eq!(b"Hello world!", &data); 236 /// Ok(()) 237 /// } 238 /// ``` 239 /// [`tokio::net::TcpStream`]: TcpStream 240 /// [`std::net::TcpStream`]: std::net::TcpStream 241 /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking into_std(self) -> io::Result<std::net::TcpStream>242 pub fn into_std(self) -> io::Result<std::net::TcpStream> { 243 #[cfg(unix)] 244 { 245 use std::os::unix::io::{FromRawFd, IntoRawFd}; 246 self.io 247 .into_inner() 248 .map(|io| io.into_raw_fd()) 249 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) }) 250 } 251 252 #[cfg(windows)] 253 { 254 use std::os::windows::io::{FromRawSocket, IntoRawSocket}; 255 self.io 256 .into_inner() 257 .map(|io| io.into_raw_socket()) 258 .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) }) 259 } 260 261 #[cfg(target_os = "wasi")] 262 { 263 use std::os::wasi::io::{FromRawFd, IntoRawFd}; 264 self.io 265 .into_inner() 266 .map(|io| io.into_raw_fd()) 267 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) }) 268 } 269 } 270 271 /// Returns the local address that this stream is bound to. 272 /// 273 /// # Examples 274 /// 275 /// ```no_run 276 /// use tokio::net::TcpStream; 277 /// 278 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 279 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 280 /// 281 /// println!("{:?}", stream.local_addr()?); 282 /// # Ok(()) 283 /// # } 284 /// ``` local_addr(&self) -> io::Result<SocketAddr>285 pub fn local_addr(&self) -> io::Result<SocketAddr> { 286 self.io.local_addr() 287 } 288 289 /// Returns the value of the `SO_ERROR` option. take_error(&self) -> io::Result<Option<io::Error>>290 pub fn take_error(&self) -> io::Result<Option<io::Error>> { 291 self.io.take_error() 292 } 293 294 /// Returns the remote address that this stream is connected to. 295 /// 296 /// # Examples 297 /// 298 /// ```no_run 299 /// use tokio::net::TcpStream; 300 /// 301 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 302 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 303 /// 304 /// println!("{:?}", stream.peer_addr()?); 305 /// # Ok(()) 306 /// # } 307 /// ``` peer_addr(&self) -> io::Result<SocketAddr>308 pub fn peer_addr(&self) -> io::Result<SocketAddr> { 309 self.io.peer_addr() 310 } 311 312 /// Attempts to receive data on the socket, without removing that data from 313 /// the queue, registering the current task for wakeup if data is not yet 314 /// available. 315 /// 316 /// Note that on multiple calls to `poll_peek`, `poll_read` or 317 /// `poll_read_ready`, only the `Waker` from the `Context` passed to the 318 /// most recent call is scheduled to receive a wakeup. (However, 319 /// `poll_write` retains a second, independent waker.) 320 /// 321 /// # Return value 322 /// 323 /// The function returns: 324 /// 325 /// * `Poll::Pending` if data is not yet available. 326 /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked. 327 /// * `Poll::Ready(Err(e))` if an error is encountered. 328 /// 329 /// # Errors 330 /// 331 /// This function may encounter any standard I/O error except `WouldBlock`. 332 /// 333 /// # Examples 334 /// 335 /// ```no_run 336 /// use tokio::io::{self, ReadBuf}; 337 /// use tokio::net::TcpStream; 338 /// 339 /// use futures::future::poll_fn; 340 /// 341 /// #[tokio::main] 342 /// async fn main() -> io::Result<()> { 343 /// let stream = TcpStream::connect("127.0.0.1:8000").await?; 344 /// let mut buf = [0; 10]; 345 /// let mut buf = ReadBuf::new(&mut buf); 346 /// 347 /// poll_fn(|cx| { 348 /// stream.poll_peek(cx, &mut buf) 349 /// }).await?; 350 /// 351 /// Ok(()) 352 /// } 353 /// ``` poll_peek( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<usize>>354 pub fn poll_peek( 355 &self, 356 cx: &mut Context<'_>, 357 buf: &mut ReadBuf<'_>, 358 ) -> Poll<io::Result<usize>> { 359 loop { 360 let ev = ready!(self.io.registration().poll_read_ready(cx))?; 361 362 let b = unsafe { 363 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) 364 }; 365 366 match self.io.peek(b) { 367 Ok(ret) => { 368 unsafe { buf.assume_init(ret) }; 369 buf.advance(ret); 370 return Poll::Ready(Ok(ret)); 371 } 372 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 373 self.io.registration().clear_readiness(ev); 374 } 375 Err(e) => return Poll::Ready(Err(e)), 376 } 377 } 378 } 379 380 /// Waits for any of the requested ready states. 381 /// 382 /// This function is usually paired with `try_read()` or `try_write()`. It 383 /// can be used to concurrently read / write to the same socket on a single 384 /// task without splitting the socket. 385 /// 386 /// The function may complete without the socket being ready. This is a 387 /// false-positive and attempting an operation will return with 388 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty 389 /// [`Ready`] set, so you should always check the returned value and possibly 390 /// wait again if the requested states are not set. 391 /// 392 /// # Cancel safety 393 /// 394 /// This method is cancel safe. Once a readiness event occurs, the method 395 /// will continue to return immediately until the readiness event is 396 /// consumed by an attempt to read or write that fails with `WouldBlock` or 397 /// `Poll::Pending`. 398 /// 399 /// # Examples 400 /// 401 /// Concurrently read and write to the stream on the same task without 402 /// splitting. 403 /// 404 /// ```no_run 405 /// use tokio::io::Interest; 406 /// use tokio::net::TcpStream; 407 /// use std::error::Error; 408 /// use std::io; 409 /// 410 /// #[tokio::main] 411 /// async fn main() -> Result<(), Box<dyn Error>> { 412 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 413 /// 414 /// loop { 415 /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?; 416 /// 417 /// if ready.is_readable() { 418 /// let mut data = vec![0; 1024]; 419 /// // Try to read data, this may still fail with `WouldBlock` 420 /// // if the readiness event is a false positive. 421 /// match stream.try_read(&mut data) { 422 /// Ok(n) => { 423 /// println!("read {} bytes", n); 424 /// } 425 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 426 /// continue; 427 /// } 428 /// Err(e) => { 429 /// return Err(e.into()); 430 /// } 431 /// } 432 /// 433 /// } 434 /// 435 /// if ready.is_writable() { 436 /// // Try to write data, this may still fail with `WouldBlock` 437 /// // if the readiness event is a false positive. 438 /// match stream.try_write(b"hello world") { 439 /// Ok(n) => { 440 /// println!("write {} bytes", n); 441 /// } 442 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 443 /// continue 444 /// } 445 /// Err(e) => { 446 /// return Err(e.into()); 447 /// } 448 /// } 449 /// } 450 /// } 451 /// } 452 /// ``` ready(&self, interest: Interest) -> io::Result<Ready>453 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { 454 let event = self.io.registration().readiness(interest).await?; 455 Ok(event.ready) 456 } 457 458 /// Waits for the socket to become readable. 459 /// 460 /// This function is equivalent to `ready(Interest::READABLE)` and is usually 461 /// paired with `try_read()`. 462 /// 463 /// # Cancel safety 464 /// 465 /// This method is cancel safe. Once a readiness event occurs, the method 466 /// will continue to return immediately until the readiness event is 467 /// consumed by an attempt to read that fails with `WouldBlock` or 468 /// `Poll::Pending`. 469 /// 470 /// # Examples 471 /// 472 /// ```no_run 473 /// use tokio::net::TcpStream; 474 /// use std::error::Error; 475 /// use std::io; 476 /// 477 /// #[tokio::main] 478 /// async fn main() -> Result<(), Box<dyn Error>> { 479 /// // Connect to a peer 480 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 481 /// 482 /// let mut msg = vec![0; 1024]; 483 /// 484 /// loop { 485 /// // Wait for the socket to be readable 486 /// stream.readable().await?; 487 /// 488 /// // Try to read data, this may still fail with `WouldBlock` 489 /// // if the readiness event is a false positive. 490 /// match stream.try_read(&mut msg) { 491 /// Ok(n) => { 492 /// msg.truncate(n); 493 /// break; 494 /// } 495 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 496 /// continue; 497 /// } 498 /// Err(e) => { 499 /// return Err(e.into()); 500 /// } 501 /// } 502 /// } 503 /// 504 /// println!("GOT = {:?}", msg); 505 /// Ok(()) 506 /// } 507 /// ``` readable(&self) -> io::Result<()>508 pub async fn readable(&self) -> io::Result<()> { 509 self.ready(Interest::READABLE).await?; 510 Ok(()) 511 } 512 513 /// Polls for read readiness. 514 /// 515 /// If the tcp stream is not currently ready for reading, this method will 516 /// store a clone of the `Waker` from the provided `Context`. When the tcp 517 /// stream becomes ready for reading, `Waker::wake` will be called on the 518 /// waker. 519 /// 520 /// Note that on multiple calls to `poll_read_ready`, `poll_read` or 521 /// `poll_peek`, only the `Waker` from the `Context` passed to the most 522 /// recent call is scheduled to receive a wakeup. (However, 523 /// `poll_write_ready` retains a second, independent waker.) 524 /// 525 /// This function is intended for cases where creating and pinning a future 526 /// via [`readable`] is not feasible. Where possible, using [`readable`] is 527 /// preferred, as this supports polling from multiple tasks at once. 528 /// 529 /// # Return value 530 /// 531 /// The function returns: 532 /// 533 /// * `Poll::Pending` if the tcp stream is not ready for reading. 534 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading. 535 /// * `Poll::Ready(Err(e))` if an error is encountered. 536 /// 537 /// # Errors 538 /// 539 /// This function may encounter any standard I/O error except `WouldBlock`. 540 /// 541 /// [`readable`]: method@Self::readable poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>542 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 543 self.io.registration().poll_read_ready(cx).map_ok(|_| ()) 544 } 545 546 /// Tries to read data from the stream into the provided buffer, returning how 547 /// many bytes were read. 548 /// 549 /// Receives any pending data from the socket but does not wait for new data 550 /// to arrive. On success, returns the number of bytes read. Because 551 /// `try_read()` is non-blocking, the buffer does not have to be stored by 552 /// the async task and can exist entirely on the stack. 553 /// 554 /// Usually, [`readable()`] or [`ready()`] is used with this function. 555 /// 556 /// [`readable()`]: TcpStream::readable() 557 /// [`ready()`]: TcpStream::ready() 558 /// 559 /// # Return 560 /// 561 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 562 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios: 563 /// 564 /// 1. The stream's read half is closed and will no longer yield data. 565 /// 2. The specified buffer was 0 bytes in length. 566 /// 567 /// If the stream is not ready to read data, 568 /// `Err(io::ErrorKind::WouldBlock)` is returned. 569 /// 570 /// # Examples 571 /// 572 /// ```no_run 573 /// use tokio::net::TcpStream; 574 /// use std::error::Error; 575 /// use std::io; 576 /// 577 /// #[tokio::main] 578 /// async fn main() -> Result<(), Box<dyn Error>> { 579 /// // Connect to a peer 580 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 581 /// 582 /// loop { 583 /// // Wait for the socket to be readable 584 /// stream.readable().await?; 585 /// 586 /// // Creating the buffer **after** the `await` prevents it from 587 /// // being stored in the async task. 588 /// let mut buf = [0; 4096]; 589 /// 590 /// // Try to read data, this may still fail with `WouldBlock` 591 /// // if the readiness event is a false positive. 592 /// match stream.try_read(&mut buf) { 593 /// Ok(0) => break, 594 /// Ok(n) => { 595 /// println!("read {} bytes", n); 596 /// } 597 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 598 /// continue; 599 /// } 600 /// Err(e) => { 601 /// return Err(e.into()); 602 /// } 603 /// } 604 /// } 605 /// 606 /// Ok(()) 607 /// } 608 /// ``` try_read(&self, buf: &mut [u8]) -> io::Result<usize>609 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { 610 use std::io::Read; 611 612 self.io 613 .registration() 614 .try_io(Interest::READABLE, || (&*self.io).read(buf)) 615 } 616 617 /// Tries to read data from the stream into the provided buffers, returning 618 /// how many bytes were read. 619 /// 620 /// Data is copied to fill each buffer in order, with the final buffer 621 /// written to possibly being only partially filled. This method behaves 622 /// equivalently to a single call to [`try_read()`] with concatenated 623 /// buffers. 624 /// 625 /// Receives any pending data from the socket but does not wait for new data 626 /// to arrive. On success, returns the number of bytes read. Because 627 /// `try_read_vectored()` is non-blocking, the buffer does not have to be 628 /// stored by the async task and can exist entirely on the stack. 629 /// 630 /// Usually, [`readable()`] or [`ready()`] is used with this function. 631 /// 632 /// [`try_read()`]: TcpStream::try_read() 633 /// [`readable()`]: TcpStream::readable() 634 /// [`ready()`]: TcpStream::ready() 635 /// 636 /// # Return 637 /// 638 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 639 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 640 /// and will no longer yield data. If the stream is not ready to read data 641 /// `Err(io::ErrorKind::WouldBlock)` is returned. 642 /// 643 /// # Examples 644 /// 645 /// ```no_run 646 /// use tokio::net::TcpStream; 647 /// use std::error::Error; 648 /// use std::io::{self, IoSliceMut}; 649 /// 650 /// #[tokio::main] 651 /// async fn main() -> Result<(), Box<dyn Error>> { 652 /// // Connect to a peer 653 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 654 /// 655 /// loop { 656 /// // Wait for the socket to be readable 657 /// stream.readable().await?; 658 /// 659 /// // Creating the buffer **after** the `await` prevents it from 660 /// // being stored in the async task. 661 /// let mut buf_a = [0; 512]; 662 /// let mut buf_b = [0; 1024]; 663 /// let mut bufs = [ 664 /// IoSliceMut::new(&mut buf_a), 665 /// IoSliceMut::new(&mut buf_b), 666 /// ]; 667 /// 668 /// // Try to read data, this may still fail with `WouldBlock` 669 /// // if the readiness event is a false positive. 670 /// match stream.try_read_vectored(&mut bufs) { 671 /// Ok(0) => break, 672 /// Ok(n) => { 673 /// println!("read {} bytes", n); 674 /// } 675 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 676 /// continue; 677 /// } 678 /// Err(e) => { 679 /// return Err(e.into()); 680 /// } 681 /// } 682 /// } 683 /// 684 /// Ok(()) 685 /// } 686 /// ``` try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>687 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { 688 use std::io::Read; 689 690 self.io 691 .registration() 692 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) 693 } 694 695 cfg_io_util! { 696 /// Tries to read data from the stream into the provided buffer, advancing the 697 /// buffer's internal cursor, returning how many bytes were read. 698 /// 699 /// Receives any pending data from the socket but does not wait for new data 700 /// to arrive. On success, returns the number of bytes read. Because 701 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by 702 /// the async task and can exist entirely on the stack. 703 /// 704 /// Usually, [`readable()`] or [`ready()`] is used with this function. 705 /// 706 /// [`readable()`]: TcpStream::readable() 707 /// [`ready()`]: TcpStream::ready() 708 /// 709 /// # Return 710 /// 711 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 712 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 713 /// and will no longer yield data. If the stream is not ready to read data 714 /// `Err(io::ErrorKind::WouldBlock)` is returned. 715 /// 716 /// # Examples 717 /// 718 /// ```no_run 719 /// use tokio::net::TcpStream; 720 /// use std::error::Error; 721 /// use std::io; 722 /// 723 /// #[tokio::main] 724 /// async fn main() -> Result<(), Box<dyn Error>> { 725 /// // Connect to a peer 726 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 727 /// 728 /// loop { 729 /// // Wait for the socket to be readable 730 /// stream.readable().await?; 731 /// 732 /// let mut buf = Vec::with_capacity(4096); 733 /// 734 /// // Try to read data, this may still fail with `WouldBlock` 735 /// // if the readiness event is a false positive. 736 /// match stream.try_read_buf(&mut buf) { 737 /// Ok(0) => break, 738 /// Ok(n) => { 739 /// println!("read {} bytes", n); 740 /// } 741 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 742 /// continue; 743 /// } 744 /// Err(e) => { 745 /// return Err(e.into()); 746 /// } 747 /// } 748 /// } 749 /// 750 /// Ok(()) 751 /// } 752 /// ``` 753 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { 754 self.io.registration().try_io(Interest::READABLE, || { 755 use std::io::Read; 756 757 let dst = buf.chunk_mut(); 758 let dst = 759 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; 760 761 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the 762 // buffer. 763 let n = (&*self.io).read(dst)?; 764 765 unsafe { 766 buf.advance_mut(n); 767 } 768 769 Ok(n) 770 }) 771 } 772 } 773 774 /// Waits for the socket to become writable. 775 /// 776 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually 777 /// paired with `try_write()`. 778 /// 779 /// # Cancel safety 780 /// 781 /// This method is cancel safe. Once a readiness event occurs, the method 782 /// will continue to return immediately until the readiness event is 783 /// consumed by an attempt to write that fails with `WouldBlock` or 784 /// `Poll::Pending`. 785 /// 786 /// # Examples 787 /// 788 /// ```no_run 789 /// use tokio::net::TcpStream; 790 /// use std::error::Error; 791 /// use std::io; 792 /// 793 /// #[tokio::main] 794 /// async fn main() -> Result<(), Box<dyn Error>> { 795 /// // Connect to a peer 796 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 797 /// 798 /// loop { 799 /// // Wait for the socket to be writable 800 /// stream.writable().await?; 801 /// 802 /// // Try to write data, this may still fail with `WouldBlock` 803 /// // if the readiness event is a false positive. 804 /// match stream.try_write(b"hello world") { 805 /// Ok(n) => { 806 /// break; 807 /// } 808 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 809 /// continue; 810 /// } 811 /// Err(e) => { 812 /// return Err(e.into()); 813 /// } 814 /// } 815 /// } 816 /// 817 /// Ok(()) 818 /// } 819 /// ``` writable(&self) -> io::Result<()>820 pub async fn writable(&self) -> io::Result<()> { 821 self.ready(Interest::WRITABLE).await?; 822 Ok(()) 823 } 824 825 /// Polls for write readiness. 826 /// 827 /// If the tcp stream is not currently ready for writing, this method will 828 /// store a clone of the `Waker` from the provided `Context`. When the tcp 829 /// stream becomes ready for writing, `Waker::wake` will be called on the 830 /// waker. 831 /// 832 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only 833 /// the `Waker` from the `Context` passed to the most recent call is 834 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a 835 /// second, independent waker.) 836 /// 837 /// This function is intended for cases where creating and pinning a future 838 /// via [`writable`] is not feasible. Where possible, using [`writable`] is 839 /// preferred, as this supports polling from multiple tasks at once. 840 /// 841 /// # Return value 842 /// 843 /// The function returns: 844 /// 845 /// * `Poll::Pending` if the tcp stream is not ready for writing. 846 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing. 847 /// * `Poll::Ready(Err(e))` if an error is encountered. 848 /// 849 /// # Errors 850 /// 851 /// This function may encounter any standard I/O error except `WouldBlock`. 852 /// 853 /// [`writable`]: method@Self::writable poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>854 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 855 self.io.registration().poll_write_ready(cx).map_ok(|_| ()) 856 } 857 858 /// Try to write a buffer to the stream, returning how many bytes were 859 /// written. 860 /// 861 /// The function will attempt to write the entire contents of `buf`, but 862 /// only part of the buffer may be written. 863 /// 864 /// This function is usually paired with `writable()`. 865 /// 866 /// # Return 867 /// 868 /// If data is successfully written, `Ok(n)` is returned, where `n` is the 869 /// number of bytes written. If the stream is not ready to write data, 870 /// `Err(io::ErrorKind::WouldBlock)` is returned. 871 /// 872 /// # Examples 873 /// 874 /// ```no_run 875 /// use tokio::net::TcpStream; 876 /// use std::error::Error; 877 /// use std::io; 878 /// 879 /// #[tokio::main] 880 /// async fn main() -> Result<(), Box<dyn Error>> { 881 /// // Connect to a peer 882 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 883 /// 884 /// loop { 885 /// // Wait for the socket to be writable 886 /// stream.writable().await?; 887 /// 888 /// // Try to write data, this may still fail with `WouldBlock` 889 /// // if the readiness event is a false positive. 890 /// match stream.try_write(b"hello world") { 891 /// Ok(n) => { 892 /// break; 893 /// } 894 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 895 /// continue; 896 /// } 897 /// Err(e) => { 898 /// return Err(e.into()); 899 /// } 900 /// } 901 /// } 902 /// 903 /// Ok(()) 904 /// } 905 /// ``` try_write(&self, buf: &[u8]) -> io::Result<usize>906 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { 907 use std::io::Write; 908 909 self.io 910 .registration() 911 .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) 912 } 913 914 /// Tries to write several buffers to the stream, returning how many bytes 915 /// were written. 916 /// 917 /// Data is written from each buffer in order, with the final buffer read 918 /// from possible being only partially consumed. This method behaves 919 /// equivalently to a single call to [`try_write()`] with concatenated 920 /// buffers. 921 /// 922 /// This function is usually paired with `writable()`. 923 /// 924 /// [`try_write()`]: TcpStream::try_write() 925 /// 926 /// # Return 927 /// 928 /// If data is successfully written, `Ok(n)` is returned, where `n` is the 929 /// number of bytes written. If the stream is not ready to write data, 930 /// `Err(io::ErrorKind::WouldBlock)` is returned. 931 /// 932 /// # Examples 933 /// 934 /// ```no_run 935 /// use tokio::net::TcpStream; 936 /// use std::error::Error; 937 /// use std::io; 938 /// 939 /// #[tokio::main] 940 /// async fn main() -> Result<(), Box<dyn Error>> { 941 /// // Connect to a peer 942 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 943 /// 944 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; 945 /// 946 /// loop { 947 /// // Wait for the socket to be writable 948 /// stream.writable().await?; 949 /// 950 /// // Try to write data, this may still fail with `WouldBlock` 951 /// // if the readiness event is a false positive. 952 /// match stream.try_write_vectored(&bufs) { 953 /// Ok(n) => { 954 /// break; 955 /// } 956 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 957 /// continue; 958 /// } 959 /// Err(e) => { 960 /// return Err(e.into()); 961 /// } 962 /// } 963 /// } 964 /// 965 /// Ok(()) 966 /// } 967 /// ``` try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize>968 pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> { 969 use std::io::Write; 970 971 self.io 972 .registration() 973 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs)) 974 } 975 976 /// Tries to read or write from the socket using a user-provided IO operation. 977 /// 978 /// If the socket is ready, the provided closure is called. The closure 979 /// should attempt to perform IO operation on the socket by manually 980 /// calling the appropriate syscall. If the operation fails because the 981 /// socket is not actually ready, then the closure should return a 982 /// `WouldBlock` error and the readiness flag is cleared. The return value 983 /// of the closure is then returned by `try_io`. 984 /// 985 /// If the socket is not ready, then the closure is not called 986 /// and a `WouldBlock` error is returned. 987 /// 988 /// The closure should only return a `WouldBlock` error if it has performed 989 /// an IO operation on the socket that failed due to the socket not being 990 /// ready. Returning a `WouldBlock` error in any other situation will 991 /// incorrectly clear the readiness flag, which can cause the socket to 992 /// behave incorrectly. 993 /// 994 /// The closure should not perform the IO operation using any of the methods 995 /// defined on the Tokio `TcpStream` type, as this will mess with the 996 /// readiness flag and can cause the socket to behave incorrectly. 997 /// 998 /// This method is not intended to be used with combined interests. 999 /// The closure should perform only one type of IO operation, so it should not 1000 /// require more than one ready state. This method may panic or sleep forever 1001 /// if it is called with a combined interest. 1002 /// 1003 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. 1004 /// 1005 /// [`readable()`]: TcpStream::readable() 1006 /// [`writable()`]: TcpStream::writable() 1007 /// [`ready()`]: TcpStream::ready() try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1008 pub fn try_io<R>( 1009 &self, 1010 interest: Interest, 1011 f: impl FnOnce() -> io::Result<R>, 1012 ) -> io::Result<R> { 1013 self.io 1014 .registration() 1015 .try_io(interest, || self.io.try_io(f)) 1016 } 1017 1018 /// Reads or writes from the socket using a user-provided IO operation. 1019 /// 1020 /// The readiness of the socket is awaited and when the socket is ready, 1021 /// the provided closure is called. The closure should attempt to perform 1022 /// IO operation on the socket by manually calling the appropriate syscall. 1023 /// If the operation fails because the socket is not actually ready, 1024 /// then the closure should return a `WouldBlock` error. In such case the 1025 /// readiness flag is cleared and the socket readiness is awaited again. 1026 /// This loop is repeated until the closure returns an `Ok` or an error 1027 /// other than `WouldBlock`. 1028 /// 1029 /// The closure should only return a `WouldBlock` error if it has performed 1030 /// an IO operation on the socket that failed due to the socket not being 1031 /// ready. Returning a `WouldBlock` error in any other situation will 1032 /// incorrectly clear the readiness flag, which can cause the socket to 1033 /// behave incorrectly. 1034 /// 1035 /// The closure should not perform the IO operation using any of the methods 1036 /// defined on the Tokio `TcpStream` type, as this will mess with the 1037 /// readiness flag and can cause the socket to behave incorrectly. 1038 /// 1039 /// This method is not intended to be used with combined interests. 1040 /// The closure should perform only one type of IO operation, so it should not 1041 /// require more than one ready state. This method may panic or sleep forever 1042 /// if it is called with a combined interest. async_io<R>( &self, interest: Interest, mut f: impl FnMut() -> io::Result<R>, ) -> io::Result<R>1043 pub async fn async_io<R>( 1044 &self, 1045 interest: Interest, 1046 mut f: impl FnMut() -> io::Result<R>, 1047 ) -> io::Result<R> { 1048 self.io 1049 .registration() 1050 .async_io(interest, || self.io.try_io(&mut f)) 1051 .await 1052 } 1053 1054 /// Receives data on the socket from the remote address to which it is 1055 /// connected, without removing that data from the queue. On success, 1056 /// returns the number of bytes peeked. 1057 /// 1058 /// Successive calls return the same data. This is accomplished by passing 1059 /// `MSG_PEEK` as a flag to the underlying recv system call. 1060 /// 1061 /// # Examples 1062 /// 1063 /// ```no_run 1064 /// use tokio::net::TcpStream; 1065 /// use tokio::io::AsyncReadExt; 1066 /// use std::error::Error; 1067 /// 1068 /// #[tokio::main] 1069 /// async fn main() -> Result<(), Box<dyn Error>> { 1070 /// // Connect to a peer 1071 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; 1072 /// 1073 /// let mut b1 = [0; 10]; 1074 /// let mut b2 = [0; 10]; 1075 /// 1076 /// // Peek at the data 1077 /// let n = stream.peek(&mut b1).await?; 1078 /// 1079 /// // Read the data 1080 /// assert_eq!(n, stream.read(&mut b2[..n]).await?); 1081 /// assert_eq!(&b1[..n], &b2[..n]); 1082 /// 1083 /// Ok(()) 1084 /// } 1085 /// ``` 1086 /// 1087 /// The [`read`] method is defined on the [`AsyncReadExt`] trait. 1088 /// 1089 /// [`read`]: fn@crate::io::AsyncReadExt::read 1090 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt peek(&self, buf: &mut [u8]) -> io::Result<usize>1091 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { 1092 self.io 1093 .registration() 1094 .async_io(Interest::READABLE, || self.io.peek(buf)) 1095 .await 1096 } 1097 1098 /// Shuts down the read, write, or both halves of this connection. 1099 /// 1100 /// This function will cause all pending and future I/O on the specified 1101 /// portions to return immediately with an appropriate value (see the 1102 /// documentation of `Shutdown`). shutdown_std(&self, how: Shutdown) -> io::Result<()>1103 pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> { 1104 self.io.shutdown(how) 1105 } 1106 1107 /// Gets the value of the `TCP_NODELAY` option on this socket. 1108 /// 1109 /// For more information about this option, see [`set_nodelay`]. 1110 /// 1111 /// [`set_nodelay`]: TcpStream::set_nodelay 1112 /// 1113 /// # Examples 1114 /// 1115 /// ```no_run 1116 /// use tokio::net::TcpStream; 1117 /// 1118 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1119 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1120 /// 1121 /// println!("{:?}", stream.nodelay()?); 1122 /// # Ok(()) 1123 /// # } 1124 /// ``` nodelay(&self) -> io::Result<bool>1125 pub fn nodelay(&self) -> io::Result<bool> { 1126 self.io.nodelay() 1127 } 1128 1129 /// Sets the value of the `TCP_NODELAY` option on this socket. 1130 /// 1131 /// If set, this option disables the Nagle algorithm. This means that 1132 /// segments are always sent as soon as possible, even if there is only a 1133 /// small amount of data. When not set, data is buffered until there is a 1134 /// sufficient amount to send out, thereby avoiding the frequent sending of 1135 /// small packets. 1136 /// 1137 /// # Examples 1138 /// 1139 /// ```no_run 1140 /// use tokio::net::TcpStream; 1141 /// 1142 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1143 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1144 /// 1145 /// stream.set_nodelay(true)?; 1146 /// # Ok(()) 1147 /// # } 1148 /// ``` set_nodelay(&self, nodelay: bool) -> io::Result<()>1149 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { 1150 self.io.set_nodelay(nodelay) 1151 } 1152 1153 cfg_not_wasi! { 1154 /// Reads the linger duration for this socket by getting the `SO_LINGER` 1155 /// option. 1156 /// 1157 /// For more information about this option, see [`set_linger`]. 1158 /// 1159 /// [`set_linger`]: TcpStream::set_linger 1160 /// 1161 /// # Examples 1162 /// 1163 /// ```no_run 1164 /// use tokio::net::TcpStream; 1165 /// 1166 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1167 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1168 /// 1169 /// println!("{:?}", stream.linger()?); 1170 /// # Ok(()) 1171 /// # } 1172 /// ``` 1173 pub fn linger(&self) -> io::Result<Option<Duration>> { 1174 socket2::SockRef::from(self).linger() 1175 } 1176 1177 /// Sets the linger duration of this socket by setting the SO_LINGER option. 1178 /// 1179 /// This option controls the action taken when a stream has unsent messages and the stream is 1180 /// closed. If SO_LINGER is set, the system shall block the process until it can transmit the 1181 /// data or until the time expires. 1182 /// 1183 /// If SO_LINGER is not specified, and the stream is closed, the system handles the call in a 1184 /// way that allows the process to continue as quickly as possible. 1185 /// 1186 /// # Examples 1187 /// 1188 /// ```no_run 1189 /// use tokio::net::TcpStream; 1190 /// 1191 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1192 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1193 /// 1194 /// stream.set_linger(None)?; 1195 /// # Ok(()) 1196 /// # } 1197 /// ``` 1198 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> { 1199 socket2::SockRef::from(self).set_linger(dur) 1200 } 1201 } 1202 1203 /// Gets the value of the `IP_TTL` option for this socket. 1204 /// 1205 /// For more information about this option, see [`set_ttl`]. 1206 /// 1207 /// [`set_ttl`]: TcpStream::set_ttl 1208 /// 1209 /// # Examples 1210 /// 1211 /// ```no_run 1212 /// use tokio::net::TcpStream; 1213 /// 1214 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1215 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1216 /// 1217 /// println!("{:?}", stream.ttl()?); 1218 /// # Ok(()) 1219 /// # } 1220 /// ``` ttl(&self) -> io::Result<u32>1221 pub fn ttl(&self) -> io::Result<u32> { 1222 self.io.ttl() 1223 } 1224 1225 /// Sets the value for the `IP_TTL` option on this socket. 1226 /// 1227 /// This value sets the time-to-live field that is used in every packet sent 1228 /// from this socket. 1229 /// 1230 /// # Examples 1231 /// 1232 /// ```no_run 1233 /// use tokio::net::TcpStream; 1234 /// 1235 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1236 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1237 /// 1238 /// stream.set_ttl(123)?; 1239 /// # Ok(()) 1240 /// # } 1241 /// ``` set_ttl(&self, ttl: u32) -> io::Result<()>1242 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { 1243 self.io.set_ttl(ttl) 1244 } 1245 1246 // These lifetime markers also appear in the generated documentation, and make 1247 // it more clear that this is a *borrowed* split. 1248 #[allow(clippy::needless_lifetimes)] 1249 /// Splits a `TcpStream` into a read half and a write half, which can be used 1250 /// to read and write the stream concurrently. 1251 /// 1252 /// This method is more efficient than [`into_split`], but the halves cannot be 1253 /// moved into independently spawned tasks. 1254 /// 1255 /// [`into_split`]: TcpStream::into_split() split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>)1256 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) { 1257 split(self) 1258 } 1259 1260 /// Splits a `TcpStream` into a read half and a write half, which can be used 1261 /// to read and write the stream concurrently. 1262 /// 1263 /// Unlike [`split`], the owned halves can be moved to separate tasks, however 1264 /// this comes at the cost of a heap allocation. 1265 /// 1266 /// **Note:** Dropping the write half will shut down the write half of the TCP 1267 /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`. 1268 /// 1269 /// [`split`]: TcpStream::split() 1270 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown into_split(self) -> (OwnedReadHalf, OwnedWriteHalf)1271 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) { 1272 split_owned(self) 1273 } 1274 1275 // == Poll IO functions that takes `&self` == 1276 // 1277 // To read or write without mutable access to the `UnixStream`, combine the 1278 // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or 1279 // `try_write` methods. 1280 poll_read_priv( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1281 pub(crate) fn poll_read_priv( 1282 &self, 1283 cx: &mut Context<'_>, 1284 buf: &mut ReadBuf<'_>, 1285 ) -> Poll<io::Result<()>> { 1286 // Safety: `TcpStream::read` correctly handles reads into uninitialized memory 1287 unsafe { self.io.poll_read(cx, buf) } 1288 } 1289 poll_write_priv( &self, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1290 pub(super) fn poll_write_priv( 1291 &self, 1292 cx: &mut Context<'_>, 1293 buf: &[u8], 1294 ) -> Poll<io::Result<usize>> { 1295 self.io.poll_write(cx, buf) 1296 } 1297 poll_write_vectored_priv( &self, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1298 pub(super) fn poll_write_vectored_priv( 1299 &self, 1300 cx: &mut Context<'_>, 1301 bufs: &[io::IoSlice<'_>], 1302 ) -> Poll<io::Result<usize>> { 1303 self.io.poll_write_vectored(cx, bufs) 1304 } 1305 } 1306 1307 impl TryFrom<std::net::TcpStream> for TcpStream { 1308 type Error = io::Error; 1309 1310 /// Consumes stream, returning the tokio I/O object. 1311 /// 1312 /// This is equivalent to 1313 /// [`TcpStream::from_std(stream)`](TcpStream::from_std). try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error>1314 fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> { 1315 Self::from_std(stream) 1316 } 1317 } 1318 1319 // ===== impl Read / Write ===== 1320 1321 impl AsyncRead for TcpStream { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1322 fn poll_read( 1323 self: Pin<&mut Self>, 1324 cx: &mut Context<'_>, 1325 buf: &mut ReadBuf<'_>, 1326 ) -> Poll<io::Result<()>> { 1327 self.poll_read_priv(cx, buf) 1328 } 1329 } 1330 1331 impl AsyncWrite for TcpStream { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1332 fn poll_write( 1333 self: Pin<&mut Self>, 1334 cx: &mut Context<'_>, 1335 buf: &[u8], 1336 ) -> Poll<io::Result<usize>> { 1337 self.poll_write_priv(cx, buf) 1338 } 1339 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1340 fn poll_write_vectored( 1341 self: Pin<&mut Self>, 1342 cx: &mut Context<'_>, 1343 bufs: &[io::IoSlice<'_>], 1344 ) -> Poll<io::Result<usize>> { 1345 self.poll_write_vectored_priv(cx, bufs) 1346 } 1347 is_write_vectored(&self) -> bool1348 fn is_write_vectored(&self) -> bool { 1349 true 1350 } 1351 1352 #[inline] poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1353 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 1354 // tcp flush is a no-op 1355 Poll::Ready(Ok(())) 1356 } 1357 poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1358 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 1359 self.shutdown_std(std::net::Shutdown::Write)?; 1360 Poll::Ready(Ok(())) 1361 } 1362 } 1363 1364 impl fmt::Debug for TcpStream { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1365 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1366 self.io.fmt(f) 1367 } 1368 } 1369 1370 #[cfg(unix)] 1371 mod sys { 1372 use super::TcpStream; 1373 use std::os::unix::prelude::*; 1374 1375 impl AsRawFd for TcpStream { as_raw_fd(&self) -> RawFd1376 fn as_raw_fd(&self) -> RawFd { 1377 self.io.as_raw_fd() 1378 } 1379 } 1380 1381 impl AsFd for TcpStream { as_fd(&self) -> BorrowedFd<'_>1382 fn as_fd(&self) -> BorrowedFd<'_> { 1383 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } 1384 } 1385 } 1386 } 1387 1388 cfg_windows! { 1389 use crate::os::windows::io::{AsRawSocket, RawSocket, AsSocket, BorrowedSocket}; 1390 1391 impl AsRawSocket for TcpStream { 1392 fn as_raw_socket(&self) -> RawSocket { 1393 self.io.as_raw_socket() 1394 } 1395 } 1396 1397 impl AsSocket for TcpStream { 1398 fn as_socket(&self) -> BorrowedSocket<'_> { 1399 unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) } 1400 } 1401 } 1402 } 1403 1404 #[cfg(all(tokio_unstable, target_os = "wasi"))] 1405 mod sys { 1406 use super::TcpStream; 1407 use std::os::wasi::prelude::*; 1408 1409 impl AsRawFd for TcpStream { as_raw_fd(&self) -> RawFd1410 fn as_raw_fd(&self) -> RawFd { 1411 self.io.as_raw_fd() 1412 } 1413 } 1414 1415 impl AsFd for TcpStream { as_fd(&self) -> BorrowedFd<'_>1416 fn as_fd(&self) -> BorrowedFd<'_> { 1417 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) } 1418 } 1419 } 1420 } 1421