1 use crate::future::poll_fn; 2 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready}; 3 use crate::net::tcp::split::{split, ReadHalf, WriteHalf}; 4 use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf}; 5 use crate::net::{to_socket_addrs, ToSocketAddrs}; 6 7 use std::convert::TryFrom; 8 use std::fmt; 9 use std::io; 10 use std::net::{Shutdown, SocketAddr}; 11 use std::pin::Pin; 12 use std::task::{Context, Poll}; 13 use std::time::Duration; 14 15 cfg_io_util! { 16 use bytes::BufMut; 17 } 18 19 cfg_net! { 20 /// A TCP stream between a local and a remote socket. 21 /// 22 /// A TCP stream can either be created by connecting to an endpoint, via the 23 /// [`connect`] method, or by [accepting] a connection from a [listener]. A 24 /// TCP stream can also be created via the [`TcpSocket`] type. 25 /// 26 /// Reading and writing to a `TcpStream` is usually done using the 27 /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] 28 /// traits. 29 /// 30 /// [`connect`]: method@TcpStream::connect 31 /// [accepting]: method@crate::net::TcpListener::accept 32 /// [listener]: struct@crate::net::TcpListener 33 /// [`TcpSocket`]: struct@crate::net::TcpSocket 34 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt 35 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt 36 /// 37 /// # Examples 38 /// 39 /// ```no_run 40 /// use tokio::net::TcpStream; 41 /// use tokio::io::AsyncWriteExt; 42 /// use std::error::Error; 43 /// 44 /// #[tokio::main] 45 /// async fn main() -> Result<(), Box<dyn Error>> { 46 /// // Connect to a peer 47 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; 48 /// 49 /// // Write some data. 50 /// stream.write_all(b"hello world!").await?; 51 /// 52 /// Ok(()) 53 /// } 54 /// ``` 55 /// 56 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. 57 /// 58 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all 59 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt 60 /// 61 /// To shut down the stream in the write direction, you can call the 62 /// [`shutdown()`] method. This will cause the other peer to receive a read of 63 /// length 0, indicating that no more data will be sent. This only closes 64 /// the stream in one direction. 65 /// 66 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown 67 pub struct TcpStream { 68 io: PollEvented<mio::net::TcpStream>, 69 } 70 } 71 72 impl TcpStream { 73 /// Opens a TCP connection to a remote host. 74 /// 75 /// `addr` is an address of the remote host. Anything which implements the 76 /// [`ToSocketAddrs`] trait can be supplied as the address. If `addr` 77 /// yields multiple addresses, connect will be attempted with each of the 78 /// addresses until a connection is successful. If none of the addresses 79 /// result in a successful connection, the error returned from the last 80 /// connection attempt (the last address) is returned. 81 /// 82 /// To configure the socket before connecting, you can use the [`TcpSocket`] 83 /// type. 84 /// 85 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs 86 /// [`TcpSocket`]: struct@crate::net::TcpSocket 87 /// 88 /// # Examples 89 /// 90 /// ```no_run 91 /// use tokio::net::TcpStream; 92 /// use tokio::io::AsyncWriteExt; 93 /// use std::error::Error; 94 /// 95 /// #[tokio::main] 96 /// async fn main() -> Result<(), Box<dyn Error>> { 97 /// // Connect to a peer 98 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; 99 /// 100 /// // Write some data. 101 /// stream.write_all(b"hello world!").await?; 102 /// 103 /// Ok(()) 104 /// } 105 /// ``` 106 /// 107 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait. 108 /// 109 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all 110 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream>111 pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> { 112 let addrs = to_socket_addrs(addr).await?; 113 114 let mut last_err = None; 115 116 for addr in addrs { 117 match TcpStream::connect_addr(addr).await { 118 Ok(stream) => return Ok(stream), 119 Err(e) => last_err = Some(e), 120 } 121 } 122 123 Err(last_err.unwrap_or_else(|| { 124 io::Error::new( 125 io::ErrorKind::InvalidInput, 126 "could not resolve to any address", 127 ) 128 })) 129 } 130 131 /// Establishes a connection to the specified `addr`. connect_addr(addr: SocketAddr) -> io::Result<TcpStream>132 async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> { 133 let sys = mio::net::TcpStream::connect(addr)?; 134 TcpStream::connect_mio(sys).await 135 } 136 connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream>137 pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> { 138 let stream = TcpStream::new(sys)?; 139 140 // Once we've connected, wait for the stream to be writable as 141 // that's when the actual connection has been initiated. Once we're 142 // writable we check for `take_socket_error` to see if the connect 143 // actually hit an error or not. 144 // 145 // If all that succeeded then we ship everything on up. 146 poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?; 147 148 if let Some(e) = stream.io.take_error()? { 149 return Err(e); 150 } 151 152 Ok(stream) 153 } 154 new(connected: mio::net::TcpStream) -> io::Result<TcpStream>155 pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> { 156 let io = PollEvented::new(connected)?; 157 Ok(TcpStream { io }) 158 } 159 160 /// Creates new `TcpStream` from a `std::net::TcpStream`. 161 /// 162 /// This function is intended to be used to wrap a TCP stream from the 163 /// standard library in the Tokio equivalent. The conversion assumes nothing 164 /// about the underlying stream; it is left up to the user to set it in 165 /// non-blocking mode. 166 /// 167 /// # Examples 168 /// 169 /// ```rust,no_run 170 /// use std::error::Error; 171 /// use tokio::net::TcpStream; 172 /// 173 /// #[tokio::main] 174 /// async fn main() -> Result<(), Box<dyn Error>> { 175 /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?; 176 /// std_stream.set_nonblocking(true)?; 177 /// let stream = TcpStream::from_std(std_stream)?; 178 /// Ok(()) 179 /// } 180 /// ``` 181 /// 182 /// # Panics 183 /// 184 /// This function panics if thread-local runtime is not set. 185 /// 186 /// The runtime is usually set implicitly when this function is called 187 /// from a future driven by a tokio runtime, otherwise runtime can be set 188 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. from_std(stream: std::net::TcpStream) -> io::Result<TcpStream>189 pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> { 190 let io = mio::net::TcpStream::from_std(stream); 191 let io = PollEvented::new(io)?; 192 Ok(TcpStream { io }) 193 } 194 195 /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`]. 196 /// 197 /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`. 198 /// Use [`set_nonblocking`] to change the blocking mode if needed. 199 /// 200 /// # Examples 201 /// 202 /// ``` 203 /// use std::error::Error; 204 /// use std::io::Read; 205 /// use tokio::net::TcpListener; 206 /// # use tokio::net::TcpStream; 207 /// # use tokio::io::AsyncWriteExt; 208 /// 209 /// #[tokio::main] 210 /// async fn main() -> Result<(), Box<dyn Error>> { 211 /// let mut data = [0u8; 12]; 212 /// let listener = TcpListener::bind("127.0.0.1:34254").await?; 213 /// # let handle = tokio::spawn(async { 214 /// # let mut stream: TcpStream = TcpStream::connect("127.0.0.1:34254").await.unwrap(); 215 /// # stream.write(b"Hello world!").await.unwrap(); 216 /// # }); 217 /// let (tokio_tcp_stream, _) = listener.accept().await?; 218 /// let mut std_tcp_stream = tokio_tcp_stream.into_std()?; 219 /// # handle.await.expect("The task being joined has panicked"); 220 /// std_tcp_stream.set_nonblocking(false)?; 221 /// std_tcp_stream.read_exact(&mut data)?; 222 /// # assert_eq!(b"Hello world!", &data); 223 /// Ok(()) 224 /// } 225 /// ``` 226 /// [`tokio::net::TcpStream`]: TcpStream 227 /// [`std::net::TcpStream`]: std::net::TcpStream 228 /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking into_std(self) -> io::Result<std::net::TcpStream>229 pub fn into_std(self) -> io::Result<std::net::TcpStream> { 230 #[cfg(unix)] 231 { 232 use std::os::unix::io::{FromRawFd, IntoRawFd}; 233 self.io 234 .into_inner() 235 .map(|io| io.into_raw_fd()) 236 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) }) 237 } 238 239 #[cfg(windows)] 240 { 241 use std::os::windows::io::{FromRawSocket, IntoRawSocket}; 242 self.io 243 .into_inner() 244 .map(|io| io.into_raw_socket()) 245 .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) }) 246 } 247 } 248 249 /// Returns the local address that this stream is bound to. 250 /// 251 /// # Examples 252 /// 253 /// ```no_run 254 /// use tokio::net::TcpStream; 255 /// 256 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 257 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 258 /// 259 /// println!("{:?}", stream.local_addr()?); 260 /// # Ok(()) 261 /// # } 262 /// ``` local_addr(&self) -> io::Result<SocketAddr>263 pub fn local_addr(&self) -> io::Result<SocketAddr> { 264 self.io.local_addr() 265 } 266 267 /// Returns the remote address that this stream is connected to. 268 /// 269 /// # Examples 270 /// 271 /// ```no_run 272 /// use tokio::net::TcpStream; 273 /// 274 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 275 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 276 /// 277 /// println!("{:?}", stream.peer_addr()?); 278 /// # Ok(()) 279 /// # } 280 /// ``` peer_addr(&self) -> io::Result<SocketAddr>281 pub fn peer_addr(&self) -> io::Result<SocketAddr> { 282 self.io.peer_addr() 283 } 284 285 /// Attempts to receive data on the socket, without removing that data from 286 /// the queue, registering the current task for wakeup if data is not yet 287 /// available. 288 /// 289 /// Note that on multiple calls to `poll_peek`, `poll_read` or 290 /// `poll_read_ready`, only the `Waker` from the `Context` passed to the 291 /// most recent call is scheduled to receive a wakeup. (However, 292 /// `poll_write` retains a second, independent waker.) 293 /// 294 /// # Return value 295 /// 296 /// The function returns: 297 /// 298 /// * `Poll::Pending` if data is not yet available. 299 /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked. 300 /// * `Poll::Ready(Err(e))` if an error is encountered. 301 /// 302 /// # Errors 303 /// 304 /// This function may encounter any standard I/O error except `WouldBlock`. 305 /// 306 /// # Examples 307 /// 308 /// ```no_run 309 /// use tokio::io::{self, ReadBuf}; 310 /// use tokio::net::TcpStream; 311 /// 312 /// use futures::future::poll_fn; 313 /// 314 /// #[tokio::main] 315 /// async fn main() -> io::Result<()> { 316 /// let stream = TcpStream::connect("127.0.0.1:8000").await?; 317 /// let mut buf = [0; 10]; 318 /// let mut buf = ReadBuf::new(&mut buf); 319 /// 320 /// poll_fn(|cx| { 321 /// stream.poll_peek(cx, &mut buf) 322 /// }).await?; 323 /// 324 /// Ok(()) 325 /// } 326 /// ``` poll_peek( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<usize>>327 pub fn poll_peek( 328 &self, 329 cx: &mut Context<'_>, 330 buf: &mut ReadBuf<'_>, 331 ) -> Poll<io::Result<usize>> { 332 loop { 333 let ev = ready!(self.io.registration().poll_read_ready(cx))?; 334 335 let b = unsafe { 336 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) 337 }; 338 339 match self.io.peek(b) { 340 Ok(ret) => { 341 unsafe { buf.assume_init(ret) }; 342 buf.advance(ret); 343 return Poll::Ready(Ok(ret)); 344 } 345 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 346 self.io.registration().clear_readiness(ev); 347 } 348 Err(e) => return Poll::Ready(Err(e)), 349 } 350 } 351 } 352 353 /// Waits for any of the requested ready states. 354 /// 355 /// This function is usually paired with `try_read()` or `try_write()`. It 356 /// can be used to concurrently read / write to the same socket on a single 357 /// task without splitting the socket. 358 /// 359 /// # Cancel safety 360 /// 361 /// This method is cancel safe. Once a readiness event occurs, the method 362 /// will continue to return immediately until the readiness event is 363 /// consumed by an attempt to read or write that fails with `WouldBlock` or 364 /// `Poll::Pending`. 365 /// 366 /// # Examples 367 /// 368 /// Concurrently read and write to the stream on the same task without 369 /// splitting. 370 /// 371 /// ```no_run 372 /// use tokio::io::Interest; 373 /// use tokio::net::TcpStream; 374 /// use std::error::Error; 375 /// use std::io; 376 /// 377 /// #[tokio::main] 378 /// async fn main() -> Result<(), Box<dyn Error>> { 379 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 380 /// 381 /// loop { 382 /// let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?; 383 /// 384 /// if ready.is_readable() { 385 /// let mut data = vec![0; 1024]; 386 /// // Try to read data, this may still fail with `WouldBlock` 387 /// // if the readiness event is a false positive. 388 /// match stream.try_read(&mut data) { 389 /// Ok(n) => { 390 /// println!("read {} bytes", n); 391 /// } 392 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 393 /// continue; 394 /// } 395 /// Err(e) => { 396 /// return Err(e.into()); 397 /// } 398 /// } 399 /// 400 /// } 401 /// 402 /// if ready.is_writable() { 403 /// // Try to write data, this may still fail with `WouldBlock` 404 /// // if the readiness event is a false positive. 405 /// match stream.try_write(b"hello world") { 406 /// Ok(n) => { 407 /// println!("write {} bytes", n); 408 /// } 409 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 410 /// continue 411 /// } 412 /// Err(e) => { 413 /// return Err(e.into()); 414 /// } 415 /// } 416 /// } 417 /// } 418 /// } 419 /// ``` ready(&self, interest: Interest) -> io::Result<Ready>420 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> { 421 let event = self.io.registration().readiness(interest).await?; 422 Ok(event.ready) 423 } 424 425 /// Waits for the socket to become readable. 426 /// 427 /// This function is equivalent to `ready(Interest::READABLE)` and is usually 428 /// paired with `try_read()`. 429 /// 430 /// # Cancel safety 431 /// 432 /// This method is cancel safe. Once a readiness event occurs, the method 433 /// will continue to return immediately until the readiness event is 434 /// consumed by an attempt to read that fails with `WouldBlock` or 435 /// `Poll::Pending`. 436 /// 437 /// # Examples 438 /// 439 /// ```no_run 440 /// use tokio::net::TcpStream; 441 /// use std::error::Error; 442 /// use std::io; 443 /// 444 /// #[tokio::main] 445 /// async fn main() -> Result<(), Box<dyn Error>> { 446 /// // Connect to a peer 447 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 448 /// 449 /// let mut msg = vec![0; 1024]; 450 /// 451 /// loop { 452 /// // Wait for the socket to be readable 453 /// stream.readable().await?; 454 /// 455 /// // Try to read data, this may still fail with `WouldBlock` 456 /// // if the readiness event is a false positive. 457 /// match stream.try_read(&mut msg) { 458 /// Ok(n) => { 459 /// msg.truncate(n); 460 /// break; 461 /// } 462 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 463 /// continue; 464 /// } 465 /// Err(e) => { 466 /// return Err(e.into()); 467 /// } 468 /// } 469 /// } 470 /// 471 /// println!("GOT = {:?}", msg); 472 /// Ok(()) 473 /// } 474 /// ``` readable(&self) -> io::Result<()>475 pub async fn readable(&self) -> io::Result<()> { 476 self.ready(Interest::READABLE).await?; 477 Ok(()) 478 } 479 480 /// Polls for read readiness. 481 /// 482 /// If the tcp stream is not currently ready for reading, this method will 483 /// store a clone of the `Waker` from the provided `Context`. When the tcp 484 /// stream becomes ready for reading, `Waker::wake` will be called on the 485 /// waker. 486 /// 487 /// Note that on multiple calls to `poll_read_ready`, `poll_read` or 488 /// `poll_peek`, only the `Waker` from the `Context` passed to the most 489 /// recent call is scheduled to receive a wakeup. (However, 490 /// `poll_write_ready` retains a second, independent waker.) 491 /// 492 /// This function is intended for cases where creating and pinning a future 493 /// via [`readable`] is not feasible. Where possible, using [`readable`] is 494 /// preferred, as this supports polling from multiple tasks at once. 495 /// 496 /// # Return value 497 /// 498 /// The function returns: 499 /// 500 /// * `Poll::Pending` if the tcp stream is not ready for reading. 501 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading. 502 /// * `Poll::Ready(Err(e))` if an error is encountered. 503 /// 504 /// # Errors 505 /// 506 /// This function may encounter any standard I/O error except `WouldBlock`. 507 /// 508 /// [`readable`]: method@Self::readable poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>509 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 510 self.io.registration().poll_read_ready(cx).map_ok(|_| ()) 511 } 512 513 /// Tries to read data from the stream into the provided buffer, returning how 514 /// many bytes were read. 515 /// 516 /// Receives any pending data from the socket but does not wait for new data 517 /// to arrive. On success, returns the number of bytes read. Because 518 /// `try_read()` is non-blocking, the buffer does not have to be stored by 519 /// the async task and can exist entirely on the stack. 520 /// 521 /// Usually, [`readable()`] or [`ready()`] is used with this function. 522 /// 523 /// [`readable()`]: TcpStream::readable() 524 /// [`ready()`]: TcpStream::ready() 525 /// 526 /// # Return 527 /// 528 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 529 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 530 /// and will no longer yield data. If the stream is not ready to read data 531 /// `Err(io::ErrorKind::WouldBlock)` is returned. 532 /// 533 /// # Examples 534 /// 535 /// ```no_run 536 /// use tokio::net::TcpStream; 537 /// use std::error::Error; 538 /// use std::io; 539 /// 540 /// #[tokio::main] 541 /// async fn main() -> Result<(), Box<dyn Error>> { 542 /// // Connect to a peer 543 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 544 /// 545 /// loop { 546 /// // Wait for the socket to be readable 547 /// stream.readable().await?; 548 /// 549 /// // Creating the buffer **after** the `await` prevents it from 550 /// // being stored in the async task. 551 /// let mut buf = [0; 4096]; 552 /// 553 /// // Try to read data, this may still fail with `WouldBlock` 554 /// // if the readiness event is a false positive. 555 /// match stream.try_read(&mut buf) { 556 /// Ok(0) => break, 557 /// Ok(n) => { 558 /// println!("read {} bytes", n); 559 /// } 560 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 561 /// continue; 562 /// } 563 /// Err(e) => { 564 /// return Err(e.into()); 565 /// } 566 /// } 567 /// } 568 /// 569 /// Ok(()) 570 /// } 571 /// ``` try_read(&self, buf: &mut [u8]) -> io::Result<usize>572 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> { 573 use std::io::Read; 574 575 self.io 576 .registration() 577 .try_io(Interest::READABLE, || (&*self.io).read(buf)) 578 } 579 580 /// Tries to read data from the stream into the provided buffers, returning 581 /// how many bytes were read. 582 /// 583 /// Data is copied to fill each buffer in order, with the final buffer 584 /// written to possibly being only partially filled. This method behaves 585 /// equivalently to a single call to [`try_read()`] with concatenated 586 /// buffers. 587 /// 588 /// Receives any pending data from the socket but does not wait for new data 589 /// to arrive. On success, returns the number of bytes read. Because 590 /// `try_read_vectored()` is non-blocking, the buffer does not have to be 591 /// stored by the async task and can exist entirely on the stack. 592 /// 593 /// Usually, [`readable()`] or [`ready()`] is used with this function. 594 /// 595 /// [`try_read()`]: TcpStream::try_read() 596 /// [`readable()`]: TcpStream::readable() 597 /// [`ready()`]: TcpStream::ready() 598 /// 599 /// # Return 600 /// 601 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 602 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 603 /// and will no longer yield data. If the stream is not ready to read data 604 /// `Err(io::ErrorKind::WouldBlock)` is returned. 605 /// 606 /// # Examples 607 /// 608 /// ```no_run 609 /// use tokio::net::TcpStream; 610 /// use std::error::Error; 611 /// use std::io::{self, IoSliceMut}; 612 /// 613 /// #[tokio::main] 614 /// async fn main() -> Result<(), Box<dyn Error>> { 615 /// // Connect to a peer 616 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 617 /// 618 /// loop { 619 /// // Wait for the socket to be readable 620 /// stream.readable().await?; 621 /// 622 /// // Creating the buffer **after** the `await` prevents it from 623 /// // being stored in the async task. 624 /// let mut buf_a = [0; 512]; 625 /// let mut buf_b = [0; 1024]; 626 /// let mut bufs = [ 627 /// IoSliceMut::new(&mut buf_a), 628 /// IoSliceMut::new(&mut buf_b), 629 /// ]; 630 /// 631 /// // Try to read data, this may still fail with `WouldBlock` 632 /// // if the readiness event is a false positive. 633 /// match stream.try_read_vectored(&mut bufs) { 634 /// Ok(0) => break, 635 /// Ok(n) => { 636 /// println!("read {} bytes", n); 637 /// } 638 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 639 /// continue; 640 /// } 641 /// Err(e) => { 642 /// return Err(e.into()); 643 /// } 644 /// } 645 /// } 646 /// 647 /// Ok(()) 648 /// } 649 /// ``` try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>650 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> { 651 use std::io::Read; 652 653 self.io 654 .registration() 655 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs)) 656 } 657 658 cfg_io_util! { 659 /// Tries to read data from the stream into the provided buffer, advancing the 660 /// buffer's internal cursor, returning how many bytes were read. 661 /// 662 /// Receives any pending data from the socket but does not wait for new data 663 /// to arrive. On success, returns the number of bytes read. Because 664 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by 665 /// the async task and can exist entirely on the stack. 666 /// 667 /// Usually, [`readable()`] or [`ready()`] is used with this function. 668 /// 669 /// [`readable()`]: TcpStream::readable() 670 /// [`ready()`]: TcpStream::ready() 671 /// 672 /// # Return 673 /// 674 /// If data is successfully read, `Ok(n)` is returned, where `n` is the 675 /// number of bytes read. `Ok(0)` indicates the stream's read half is closed 676 /// and will no longer yield data. If the stream is not ready to read data 677 /// `Err(io::ErrorKind::WouldBlock)` is returned. 678 /// 679 /// # Examples 680 /// 681 /// ```no_run 682 /// use tokio::net::TcpStream; 683 /// use std::error::Error; 684 /// use std::io; 685 /// 686 /// #[tokio::main] 687 /// async fn main() -> Result<(), Box<dyn Error>> { 688 /// // Connect to a peer 689 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 690 /// 691 /// loop { 692 /// // Wait for the socket to be readable 693 /// stream.readable().await?; 694 /// 695 /// let mut buf = Vec::with_capacity(4096); 696 /// 697 /// // Try to read data, this may still fail with `WouldBlock` 698 /// // if the readiness event is a false positive. 699 /// match stream.try_read_buf(&mut buf) { 700 /// Ok(0) => break, 701 /// Ok(n) => { 702 /// println!("read {} bytes", n); 703 /// } 704 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 705 /// continue; 706 /// } 707 /// Err(e) => { 708 /// return Err(e.into()); 709 /// } 710 /// } 711 /// } 712 /// 713 /// Ok(()) 714 /// } 715 /// ``` 716 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> { 717 self.io.registration().try_io(Interest::READABLE, || { 718 use std::io::Read; 719 720 let dst = buf.chunk_mut(); 721 let dst = 722 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) }; 723 724 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the 725 // buffer. 726 let n = (&*self.io).read(dst)?; 727 728 unsafe { 729 buf.advance_mut(n); 730 } 731 732 Ok(n) 733 }) 734 } 735 } 736 737 /// Waits for the socket to become writable. 738 /// 739 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually 740 /// paired with `try_write()`. 741 /// 742 /// # Cancel safety 743 /// 744 /// This method is cancel safe. Once a readiness event occurs, the method 745 /// will continue to return immediately until the readiness event is 746 /// consumed by an attempt to write that fails with `WouldBlock` or 747 /// `Poll::Pending`. 748 /// 749 /// # Examples 750 /// 751 /// ```no_run 752 /// use tokio::net::TcpStream; 753 /// use std::error::Error; 754 /// use std::io; 755 /// 756 /// #[tokio::main] 757 /// async fn main() -> Result<(), Box<dyn Error>> { 758 /// // Connect to a peer 759 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 760 /// 761 /// loop { 762 /// // Wait for the socket to be writable 763 /// stream.writable().await?; 764 /// 765 /// // Try to write data, this may still fail with `WouldBlock` 766 /// // if the readiness event is a false positive. 767 /// match stream.try_write(b"hello world") { 768 /// Ok(n) => { 769 /// break; 770 /// } 771 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 772 /// continue; 773 /// } 774 /// Err(e) => { 775 /// return Err(e.into()); 776 /// } 777 /// } 778 /// } 779 /// 780 /// Ok(()) 781 /// } 782 /// ``` writable(&self) -> io::Result<()>783 pub async fn writable(&self) -> io::Result<()> { 784 self.ready(Interest::WRITABLE).await?; 785 Ok(()) 786 } 787 788 /// Polls for write readiness. 789 /// 790 /// If the tcp stream is not currently ready for writing, this method will 791 /// store a clone of the `Waker` from the provided `Context`. When the tcp 792 /// stream becomes ready for writing, `Waker::wake` will be called on the 793 /// waker. 794 /// 795 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only 796 /// the `Waker` from the `Context` passed to the most recent call is 797 /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a 798 /// second, independent waker.) 799 /// 800 /// This function is intended for cases where creating and pinning a future 801 /// via [`writable`] is not feasible. Where possible, using [`writable`] is 802 /// preferred, as this supports polling from multiple tasks at once. 803 /// 804 /// # Return value 805 /// 806 /// The function returns: 807 /// 808 /// * `Poll::Pending` if the tcp stream is not ready for writing. 809 /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing. 810 /// * `Poll::Ready(Err(e))` if an error is encountered. 811 /// 812 /// # Errors 813 /// 814 /// This function may encounter any standard I/O error except `WouldBlock`. 815 /// 816 /// [`writable`]: method@Self::writable poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>817 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> { 818 self.io.registration().poll_write_ready(cx).map_ok(|_| ()) 819 } 820 821 /// Try to write a buffer to the stream, returning how many bytes were 822 /// written. 823 /// 824 /// The function will attempt to write the entire contents of `buf`, but 825 /// only part of the buffer may be written. 826 /// 827 /// This function is usually paired with `writable()`. 828 /// 829 /// # Return 830 /// 831 /// If data is successfully written, `Ok(n)` is returned, where `n` is the 832 /// number of bytes written. If the stream is not ready to write data, 833 /// `Err(io::ErrorKind::WouldBlock)` is returned. 834 /// 835 /// # Examples 836 /// 837 /// ```no_run 838 /// use tokio::net::TcpStream; 839 /// use std::error::Error; 840 /// use std::io; 841 /// 842 /// #[tokio::main] 843 /// async fn main() -> Result<(), Box<dyn Error>> { 844 /// // Connect to a peer 845 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 846 /// 847 /// loop { 848 /// // Wait for the socket to be writable 849 /// stream.writable().await?; 850 /// 851 /// // Try to write data, this may still fail with `WouldBlock` 852 /// // if the readiness event is a false positive. 853 /// match stream.try_write(b"hello world") { 854 /// Ok(n) => { 855 /// break; 856 /// } 857 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 858 /// continue; 859 /// } 860 /// Err(e) => { 861 /// return Err(e.into()); 862 /// } 863 /// } 864 /// } 865 /// 866 /// Ok(()) 867 /// } 868 /// ``` try_write(&self, buf: &[u8]) -> io::Result<usize>869 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> { 870 use std::io::Write; 871 872 self.io 873 .registration() 874 .try_io(Interest::WRITABLE, || (&*self.io).write(buf)) 875 } 876 877 /// Tries to write several buffers to the stream, returning how many bytes 878 /// were written. 879 /// 880 /// Data is written from each buffer in order, with the final buffer read 881 /// from possible being only partially consumed. This method behaves 882 /// equivalently to a single call to [`try_write()`] with concatenated 883 /// buffers. 884 /// 885 /// This function is usually paired with `writable()`. 886 /// 887 /// [`try_write()`]: TcpStream::try_write() 888 /// 889 /// # Return 890 /// 891 /// If data is successfully written, `Ok(n)` is returned, where `n` is the 892 /// number of bytes written. If the stream is not ready to write data, 893 /// `Err(io::ErrorKind::WouldBlock)` is returned. 894 /// 895 /// # Examples 896 /// 897 /// ```no_run 898 /// use tokio::net::TcpStream; 899 /// use std::error::Error; 900 /// use std::io; 901 /// 902 /// #[tokio::main] 903 /// async fn main() -> Result<(), Box<dyn Error>> { 904 /// // Connect to a peer 905 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 906 /// 907 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")]; 908 /// 909 /// loop { 910 /// // Wait for the socket to be writable 911 /// stream.writable().await?; 912 /// 913 /// // Try to write data, this may still fail with `WouldBlock` 914 /// // if the readiness event is a false positive. 915 /// match stream.try_write_vectored(&bufs) { 916 /// Ok(n) => { 917 /// break; 918 /// } 919 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { 920 /// continue; 921 /// } 922 /// Err(e) => { 923 /// return Err(e.into()); 924 /// } 925 /// } 926 /// } 927 /// 928 /// Ok(()) 929 /// } 930 /// ``` try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize>931 pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> { 932 use std::io::Write; 933 934 self.io 935 .registration() 936 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs)) 937 } 938 939 /// Tries to read or write from the socket using a user-provided IO operation. 940 /// 941 /// If the socket is ready, the provided closure is called. The closure 942 /// should attempt to perform IO operation from the socket by manually 943 /// calling the appropriate syscall. If the operation fails because the 944 /// socket is not actually ready, then the closure should return a 945 /// `WouldBlock` error and the readiness flag is cleared. The return value 946 /// of the closure is then returned by `try_io`. 947 /// 948 /// If the socket is not ready, then the closure is not called 949 /// and a `WouldBlock` error is returned. 950 /// 951 /// The closure should only return a `WouldBlock` error if it has performed 952 /// an IO operation on the socket that failed due to the socket not being 953 /// ready. Returning a `WouldBlock` error in any other situation will 954 /// incorrectly clear the readiness flag, which can cause the socket to 955 /// behave incorrectly. 956 /// 957 /// The closure should not perform the IO operation using any of the methods 958 /// defined on the Tokio `TcpStream` type, as this will mess with the 959 /// readiness flag and can cause the socket to behave incorrectly. 960 /// 961 /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function. 962 /// 963 /// [`readable()`]: TcpStream::readable() 964 /// [`writable()`]: TcpStream::writable() 965 /// [`ready()`]: TcpStream::ready() try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>966 pub fn try_io<R>( 967 &self, 968 interest: Interest, 969 f: impl FnOnce() -> io::Result<R>, 970 ) -> io::Result<R> { 971 self.io.registration().try_io(interest, f) 972 } 973 974 /// Receives data on the socket from the remote address to which it is 975 /// connected, without removing that data from the queue. On success, 976 /// returns the number of bytes peeked. 977 /// 978 /// Successive calls return the same data. This is accomplished by passing 979 /// `MSG_PEEK` as a flag to the underlying recv system call. 980 /// 981 /// # Examples 982 /// 983 /// ```no_run 984 /// use tokio::net::TcpStream; 985 /// use tokio::io::AsyncReadExt; 986 /// use std::error::Error; 987 /// 988 /// #[tokio::main] 989 /// async fn main() -> Result<(), Box<dyn Error>> { 990 /// // Connect to a peer 991 /// let mut stream = TcpStream::connect("127.0.0.1:8080").await?; 992 /// 993 /// let mut b1 = [0; 10]; 994 /// let mut b2 = [0; 10]; 995 /// 996 /// // Peek at the data 997 /// let n = stream.peek(&mut b1).await?; 998 /// 999 /// // Read the data 1000 /// assert_eq!(n, stream.read(&mut b2[..n]).await?); 1001 /// assert_eq!(&b1[..n], &b2[..n]); 1002 /// 1003 /// Ok(()) 1004 /// } 1005 /// ``` 1006 /// 1007 /// The [`read`] method is defined on the [`AsyncReadExt`] trait. 1008 /// 1009 /// [`read`]: fn@crate::io::AsyncReadExt::read 1010 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt peek(&self, buf: &mut [u8]) -> io::Result<usize>1011 pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { 1012 self.io 1013 .registration() 1014 .async_io(Interest::READABLE, || self.io.peek(buf)) 1015 .await 1016 } 1017 1018 /// Shuts down the read, write, or both halves of this connection. 1019 /// 1020 /// This function will cause all pending and future I/O on the specified 1021 /// portions to return immediately with an appropriate value (see the 1022 /// documentation of `Shutdown`). shutdown_std(&self, how: Shutdown) -> io::Result<()>1023 pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> { 1024 self.io.shutdown(how) 1025 } 1026 1027 /// Gets the value of the `TCP_NODELAY` option on this socket. 1028 /// 1029 /// For more information about this option, see [`set_nodelay`]. 1030 /// 1031 /// [`set_nodelay`]: TcpStream::set_nodelay 1032 /// 1033 /// # Examples 1034 /// 1035 /// ```no_run 1036 /// use tokio::net::TcpStream; 1037 /// 1038 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1039 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1040 /// 1041 /// println!("{:?}", stream.nodelay()?); 1042 /// # Ok(()) 1043 /// # } 1044 /// ``` nodelay(&self) -> io::Result<bool>1045 pub fn nodelay(&self) -> io::Result<bool> { 1046 self.io.nodelay() 1047 } 1048 1049 /// Sets the value of the `TCP_NODELAY` option on this socket. 1050 /// 1051 /// If set, this option disables the Nagle algorithm. This means that 1052 /// segments are always sent as soon as possible, even if there is only a 1053 /// small amount of data. When not set, data is buffered until there is a 1054 /// sufficient amount to send out, thereby avoiding the frequent sending of 1055 /// small packets. 1056 /// 1057 /// # Examples 1058 /// 1059 /// ```no_run 1060 /// use tokio::net::TcpStream; 1061 /// 1062 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1063 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1064 /// 1065 /// stream.set_nodelay(true)?; 1066 /// # Ok(()) 1067 /// # } 1068 /// ``` set_nodelay(&self, nodelay: bool) -> io::Result<()>1069 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { 1070 self.io.set_nodelay(nodelay) 1071 } 1072 1073 /// Reads the linger duration for this socket by getting the `SO_LINGER` 1074 /// option. 1075 /// 1076 /// For more information about this option, see [`set_linger`]. 1077 /// 1078 /// [`set_linger`]: TcpStream::set_linger 1079 /// 1080 /// # Examples 1081 /// 1082 /// ```no_run 1083 /// use tokio::net::TcpStream; 1084 /// 1085 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1086 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1087 /// 1088 /// println!("{:?}", stream.linger()?); 1089 /// # Ok(()) 1090 /// # } 1091 /// ``` linger(&self) -> io::Result<Option<Duration>>1092 pub fn linger(&self) -> io::Result<Option<Duration>> { 1093 let mio_socket = std::mem::ManuallyDrop::new(self.to_mio()); 1094 1095 mio_socket.get_linger() 1096 } 1097 1098 /// Sets the linger duration of this socket by setting the SO_LINGER option. 1099 /// 1100 /// This option controls the action taken when a stream has unsent messages and the stream is 1101 /// closed. If SO_LINGER is set, the system shall block the process until it can transmit the 1102 /// data or until the time expires. 1103 /// 1104 /// If SO_LINGER is not specified, and the stream is closed, the system handles the call in a 1105 /// way that allows the process to continue as quickly as possible. 1106 /// 1107 /// # Examples 1108 /// 1109 /// ```no_run 1110 /// use tokio::net::TcpStream; 1111 /// 1112 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1113 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1114 /// 1115 /// stream.set_linger(None)?; 1116 /// # Ok(()) 1117 /// # } 1118 /// ``` set_linger(&self, dur: Option<Duration>) -> io::Result<()>1119 pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> { 1120 let mio_socket = std::mem::ManuallyDrop::new(self.to_mio()); 1121 1122 mio_socket.set_linger(dur) 1123 } 1124 to_mio(&self) -> mio::net::TcpSocket1125 fn to_mio(&self) -> mio::net::TcpSocket { 1126 #[cfg(windows)] 1127 { 1128 use std::os::windows::io::{AsRawSocket, FromRawSocket}; 1129 unsafe { mio::net::TcpSocket::from_raw_socket(self.as_raw_socket()) } 1130 } 1131 1132 #[cfg(unix)] 1133 { 1134 use std::os::unix::io::{AsRawFd, FromRawFd}; 1135 unsafe { mio::net::TcpSocket::from_raw_fd(self.as_raw_fd()) } 1136 } 1137 } 1138 1139 /// Gets the value of the `IP_TTL` option for this socket. 1140 /// 1141 /// For more information about this option, see [`set_ttl`]. 1142 /// 1143 /// [`set_ttl`]: TcpStream::set_ttl 1144 /// 1145 /// # Examples 1146 /// 1147 /// ```no_run 1148 /// use tokio::net::TcpStream; 1149 /// 1150 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1151 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1152 /// 1153 /// println!("{:?}", stream.ttl()?); 1154 /// # Ok(()) 1155 /// # } 1156 /// ``` ttl(&self) -> io::Result<u32>1157 pub fn ttl(&self) -> io::Result<u32> { 1158 self.io.ttl() 1159 } 1160 1161 /// Sets the value for the `IP_TTL` option on this socket. 1162 /// 1163 /// This value sets the time-to-live field that is used in every packet sent 1164 /// from this socket. 1165 /// 1166 /// # Examples 1167 /// 1168 /// ```no_run 1169 /// use tokio::net::TcpStream; 1170 /// 1171 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> { 1172 /// let stream = TcpStream::connect("127.0.0.1:8080").await?; 1173 /// 1174 /// stream.set_ttl(123)?; 1175 /// # Ok(()) 1176 /// # } 1177 /// ``` set_ttl(&self, ttl: u32) -> io::Result<()>1178 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { 1179 self.io.set_ttl(ttl) 1180 } 1181 1182 // These lifetime markers also appear in the generated documentation, and make 1183 // it more clear that this is a *borrowed* split. 1184 #[allow(clippy::needless_lifetimes)] 1185 /// Splits a `TcpStream` into a read half and a write half, which can be used 1186 /// to read and write the stream concurrently. 1187 /// 1188 /// This method is more efficient than [`into_split`], but the halves cannot be 1189 /// moved into independently spawned tasks. 1190 /// 1191 /// [`into_split`]: TcpStream::into_split() split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>)1192 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) { 1193 split(self) 1194 } 1195 1196 /// Splits a `TcpStream` into a read half and a write half, which can be used 1197 /// to read and write the stream concurrently. 1198 /// 1199 /// Unlike [`split`], the owned halves can be moved to separate tasks, however 1200 /// this comes at the cost of a heap allocation. 1201 /// 1202 /// **Note:** Dropping the write half will shut down the write half of the TCP 1203 /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`. 1204 /// 1205 /// [`split`]: TcpStream::split() 1206 /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown into_split(self) -> (OwnedReadHalf, OwnedWriteHalf)1207 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) { 1208 split_owned(self) 1209 } 1210 1211 // == Poll IO functions that takes `&self` == 1212 // 1213 // To read or write without mutable access to the `UnixStream`, combine the 1214 // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or 1215 // `try_write` methods. 1216 poll_read_priv( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1217 pub(crate) fn poll_read_priv( 1218 &self, 1219 cx: &mut Context<'_>, 1220 buf: &mut ReadBuf<'_>, 1221 ) -> Poll<io::Result<()>> { 1222 // Safety: `TcpStream::read` correctly handles reads into uninitialized memory 1223 unsafe { self.io.poll_read(cx, buf) } 1224 } 1225 poll_write_priv( &self, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1226 pub(super) fn poll_write_priv( 1227 &self, 1228 cx: &mut Context<'_>, 1229 buf: &[u8], 1230 ) -> Poll<io::Result<usize>> { 1231 self.io.poll_write(cx, buf) 1232 } 1233 poll_write_vectored_priv( &self, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1234 pub(super) fn poll_write_vectored_priv( 1235 &self, 1236 cx: &mut Context<'_>, 1237 bufs: &[io::IoSlice<'_>], 1238 ) -> Poll<io::Result<usize>> { 1239 self.io.poll_write_vectored(cx, bufs) 1240 } 1241 } 1242 1243 impl TryFrom<std::net::TcpStream> for TcpStream { 1244 type Error = io::Error; 1245 1246 /// Consumes stream, returning the tokio I/O object. 1247 /// 1248 /// This is equivalent to 1249 /// [`TcpStream::from_std(stream)`](TcpStream::from_std). try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error>1250 fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> { 1251 Self::from_std(stream) 1252 } 1253 } 1254 1255 // ===== impl Read / Write ===== 1256 1257 impl AsyncRead for TcpStream { poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1258 fn poll_read( 1259 self: Pin<&mut Self>, 1260 cx: &mut Context<'_>, 1261 buf: &mut ReadBuf<'_>, 1262 ) -> Poll<io::Result<()>> { 1263 self.poll_read_priv(cx, buf) 1264 } 1265 } 1266 1267 impl AsyncWrite for TcpStream { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1268 fn poll_write( 1269 self: Pin<&mut Self>, 1270 cx: &mut Context<'_>, 1271 buf: &[u8], 1272 ) -> Poll<io::Result<usize>> { 1273 self.poll_write_priv(cx, buf) 1274 } 1275 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1276 fn poll_write_vectored( 1277 self: Pin<&mut Self>, 1278 cx: &mut Context<'_>, 1279 bufs: &[io::IoSlice<'_>], 1280 ) -> Poll<io::Result<usize>> { 1281 self.poll_write_vectored_priv(cx, bufs) 1282 } 1283 is_write_vectored(&self) -> bool1284 fn is_write_vectored(&self) -> bool { 1285 true 1286 } 1287 1288 #[inline] poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1289 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 1290 // tcp flush is a no-op 1291 Poll::Ready(Ok(())) 1292 } 1293 poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1294 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { 1295 self.shutdown_std(std::net::Shutdown::Write)?; 1296 Poll::Ready(Ok(())) 1297 } 1298 } 1299 1300 impl fmt::Debug for TcpStream { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1301 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 1302 self.io.fmt(f) 1303 } 1304 } 1305 1306 #[cfg(unix)] 1307 mod sys { 1308 use super::TcpStream; 1309 use std::os::unix::prelude::*; 1310 1311 impl AsRawFd for TcpStream { as_raw_fd(&self) -> RawFd1312 fn as_raw_fd(&self) -> RawFd { 1313 self.io.as_raw_fd() 1314 } 1315 } 1316 } 1317 1318 #[cfg(windows)] 1319 mod sys { 1320 use super::TcpStream; 1321 use std::os::windows::prelude::*; 1322 1323 impl AsRawSocket for TcpStream { as_raw_socket(&self) -> RawSocket1324 fn as_raw_socket(&self) -> RawSocket { 1325 self.io.as_raw_socket() 1326 } 1327 } 1328 } 1329