• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 cfg_not_wasi! {
2     use crate::future::poll_fn;
3     use crate::net::{to_socket_addrs, ToSocketAddrs};
4     use std::time::Duration;
5 }
6 
7 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
8 use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
9 use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
10 
11 use std::fmt;
12 use std::io;
13 use std::net::{Shutdown, SocketAddr};
14 use std::pin::Pin;
15 use std::task::{Context, Poll};
16 
17 cfg_io_util! {
18     use bytes::BufMut;
19 }
20 
21 cfg_net! {
22     /// A TCP stream between a local and a remote socket.
23     ///
24     /// A TCP stream can either be created by connecting to an endpoint, via the
25     /// [`connect`] method, or by [accepting] a connection from a [listener]. A
26     /// TCP stream can also be created via the [`TcpSocket`] type.
27     ///
28     /// Reading and writing to a `TcpStream` is usually done using the
29     /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`]
30     /// traits.
31     ///
32     /// [`connect`]: method@TcpStream::connect
33     /// [accepting]: method@crate::net::TcpListener::accept
34     /// [listener]: struct@crate::net::TcpListener
35     /// [`TcpSocket`]: struct@crate::net::TcpSocket
36     /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
37     /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
38     ///
39     /// # Examples
40     ///
41     /// ```no_run
42     /// use tokio::net::TcpStream;
43     /// use tokio::io::AsyncWriteExt;
44     /// use std::error::Error;
45     ///
46     /// #[tokio::main]
47     /// async fn main() -> Result<(), Box<dyn Error>> {
48     ///     // Connect to a peer
49     ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
50     ///
51     ///     // Write some data.
52     ///     stream.write_all(b"hello world!").await?;
53     ///
54     ///     Ok(())
55     /// }
56     /// ```
57     ///
58     /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
59     ///
60     /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
61     /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
62     ///
63     /// To shut down the stream in the write direction, you can call the
64     /// [`shutdown()`] method. This will cause the other peer to receive a read of
65     /// length 0, indicating that no more data will be sent. This only closes
66     /// the stream in one direction.
67     ///
68     /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
69     pub struct TcpStream {
70         io: PollEvented<mio::net::TcpStream>,
71     }
72 }
73 
74 impl TcpStream {
75     cfg_not_wasi! {
76         /// Opens a TCP connection to a remote host.
77         ///
78         /// `addr` is an address of the remote host. Anything which implements the
79         /// [`ToSocketAddrs`] trait can be supplied as the address.  If `addr`
80         /// yields multiple addresses, connect will be attempted with each of the
81         /// addresses until a connection is successful. If none of the addresses
82         /// result in a successful connection, the error returned from the last
83         /// connection attempt (the last address) is returned.
84         ///
85         /// To configure the socket before connecting, you can use the [`TcpSocket`]
86         /// type.
87         ///
88         /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
89         /// [`TcpSocket`]: struct@crate::net::TcpSocket
90         ///
91         /// # Examples
92         ///
93         /// ```no_run
94         /// use tokio::net::TcpStream;
95         /// use tokio::io::AsyncWriteExt;
96         /// use std::error::Error;
97         ///
98         /// #[tokio::main]
99         /// async fn main() -> Result<(), Box<dyn Error>> {
100         ///     // Connect to a peer
101         ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
102         ///
103         ///     // Write some data.
104         ///     stream.write_all(b"hello world!").await?;
105         ///
106         ///     Ok(())
107         /// }
108         /// ```
109         ///
110         /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
111         ///
112         /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
113         /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
114         pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
115             let addrs = to_socket_addrs(addr).await?;
116 
117             let mut last_err = None;
118 
119             for addr in addrs {
120                 match TcpStream::connect_addr(addr).await {
121                     Ok(stream) => return Ok(stream),
122                     Err(e) => last_err = Some(e),
123                 }
124             }
125 
126             Err(last_err.unwrap_or_else(|| {
127                 io::Error::new(
128                     io::ErrorKind::InvalidInput,
129                     "could not resolve to any address",
130                 )
131             }))
132         }
133 
134         /// Establishes a connection to the specified `addr`.
135         async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
136             let sys = mio::net::TcpStream::connect(addr)?;
137             TcpStream::connect_mio(sys).await
138         }
139 
140         pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
141             let stream = TcpStream::new(sys)?;
142 
143             // Once we've connected, wait for the stream to be writable as
144             // that's when the actual connection has been initiated. Once we're
145             // writable we check for `take_socket_error` to see if the connect
146             // actually hit an error or not.
147             //
148             // If all that succeeded then we ship everything on up.
149             poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
150 
151             if let Some(e) = stream.io.take_error()? {
152                 return Err(e);
153             }
154 
155             Ok(stream)
156         }
157     }
158 
new(connected: mio::net::TcpStream) -> io::Result<TcpStream>159     pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
160         let io = PollEvented::new(connected)?;
161         Ok(TcpStream { io })
162     }
163 
164     /// Creates new `TcpStream` from a `std::net::TcpStream`.
165     ///
166     /// This function is intended to be used to wrap a TCP stream from the
167     /// standard library in the Tokio equivalent.
168     ///
169     /// # Notes
170     ///
171     /// The caller is responsible for ensuring that the stream is in
172     /// non-blocking mode. Otherwise all I/O operations on the stream
173     /// will block the thread, which will cause unexpected behavior.
174     /// Non-blocking mode can be set using [`set_nonblocking`].
175     ///
176     /// [`set_nonblocking`]: std::net::TcpStream::set_nonblocking
177     ///
178     /// # Examples
179     ///
180     /// ```rust,no_run
181     /// use std::error::Error;
182     /// use tokio::net::TcpStream;
183     ///
184     /// #[tokio::main]
185     /// async fn main() -> Result<(), Box<dyn Error>> {
186     ///     let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
187     ///     std_stream.set_nonblocking(true)?;
188     ///     let stream = TcpStream::from_std(std_stream)?;
189     ///     Ok(())
190     /// }
191     /// ```
192     ///
193     /// # Panics
194     ///
195     /// This function panics if it is not called from within a runtime with
196     /// IO enabled.
197     ///
198     /// The runtime is usually set implicitly when this function is called
199     /// from a future driven by a tokio runtime, otherwise runtime can be set
200     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
201     #[track_caller]
from_std(stream: std::net::TcpStream) -> io::Result<TcpStream>202     pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
203         let io = mio::net::TcpStream::from_std(stream);
204         let io = PollEvented::new(io)?;
205         Ok(TcpStream { io })
206     }
207 
208     /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
209     ///
210     /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
211     /// Use [`set_nonblocking`] to change the blocking mode if needed.
212     ///
213     /// # Examples
214     ///
215     /// ```
216     /// use std::error::Error;
217     /// use std::io::Read;
218     /// use tokio::net::TcpListener;
219     /// # use tokio::net::TcpStream;
220     /// # use tokio::io::AsyncWriteExt;
221     ///
222     /// #[tokio::main]
223     /// async fn main() -> Result<(), Box<dyn Error>> {
224     ///     let mut data = [0u8; 12];
225     ///     let listener = TcpListener::bind("127.0.0.1:34254").await?;
226     /// #   let handle = tokio::spawn(async {
227     /// #       let mut stream: TcpStream = TcpStream::connect("127.0.0.1:34254").await.unwrap();
228     /// #       stream.write(b"Hello world!").await.unwrap();
229     /// #   });
230     ///     let (tokio_tcp_stream, _) = listener.accept().await?;
231     ///     let mut std_tcp_stream = tokio_tcp_stream.into_std()?;
232     /// #   handle.await.expect("The task being joined has panicked");
233     ///     std_tcp_stream.set_nonblocking(false)?;
234     ///     std_tcp_stream.read_exact(&mut data)?;
235     /// #   assert_eq!(b"Hello world!", &data);
236     ///     Ok(())
237     /// }
238     /// ```
239     /// [`tokio::net::TcpStream`]: TcpStream
240     /// [`std::net::TcpStream`]: std::net::TcpStream
241     /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking
into_std(self) -> io::Result<std::net::TcpStream>242     pub fn into_std(self) -> io::Result<std::net::TcpStream> {
243         #[cfg(unix)]
244         {
245             use std::os::unix::io::{FromRawFd, IntoRawFd};
246             self.io
247                 .into_inner()
248                 .map(|io| io.into_raw_fd())
249                 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
250         }
251 
252         #[cfg(windows)]
253         {
254             use std::os::windows::io::{FromRawSocket, IntoRawSocket};
255             self.io
256                 .into_inner()
257                 .map(|io| io.into_raw_socket())
258                 .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
259         }
260 
261         #[cfg(target_os = "wasi")]
262         {
263             use std::os::wasi::io::{FromRawFd, IntoRawFd};
264             self.io
265                 .into_inner()
266                 .map(|io| io.into_raw_fd())
267                 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
268         }
269     }
270 
271     /// Returns the local address that this stream is bound to.
272     ///
273     /// # Examples
274     ///
275     /// ```no_run
276     /// use tokio::net::TcpStream;
277     ///
278     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
279     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
280     ///
281     /// println!("{:?}", stream.local_addr()?);
282     /// # Ok(())
283     /// # }
284     /// ```
local_addr(&self) -> io::Result<SocketAddr>285     pub fn local_addr(&self) -> io::Result<SocketAddr> {
286         self.io.local_addr()
287     }
288 
289     /// Returns the value of the `SO_ERROR` option.
take_error(&self) -> io::Result<Option<io::Error>>290     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
291         self.io.take_error()
292     }
293 
294     /// Returns the remote address that this stream is connected to.
295     ///
296     /// # Examples
297     ///
298     /// ```no_run
299     /// use tokio::net::TcpStream;
300     ///
301     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
302     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
303     ///
304     /// println!("{:?}", stream.peer_addr()?);
305     /// # Ok(())
306     /// # }
307     /// ```
peer_addr(&self) -> io::Result<SocketAddr>308     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
309         self.io.peer_addr()
310     }
311 
312     /// Attempts to receive data on the socket, without removing that data from
313     /// the queue, registering the current task for wakeup if data is not yet
314     /// available.
315     ///
316     /// Note that on multiple calls to `poll_peek`, `poll_read` or
317     /// `poll_read_ready`, only the `Waker` from the `Context` passed to the
318     /// most recent call is scheduled to receive a wakeup. (However,
319     /// `poll_write` retains a second, independent waker.)
320     ///
321     /// # Return value
322     ///
323     /// The function returns:
324     ///
325     /// * `Poll::Pending` if data is not yet available.
326     /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked.
327     /// * `Poll::Ready(Err(e))` if an error is encountered.
328     ///
329     /// # Errors
330     ///
331     /// This function may encounter any standard I/O error except `WouldBlock`.
332     ///
333     /// # Examples
334     ///
335     /// ```no_run
336     /// use tokio::io::{self, ReadBuf};
337     /// use tokio::net::TcpStream;
338     ///
339     /// use futures::future::poll_fn;
340     ///
341     /// #[tokio::main]
342     /// async fn main() -> io::Result<()> {
343     ///     let stream = TcpStream::connect("127.0.0.1:8000").await?;
344     ///     let mut buf = [0; 10];
345     ///     let mut buf = ReadBuf::new(&mut buf);
346     ///
347     ///     poll_fn(|cx| {
348     ///         stream.poll_peek(cx, &mut buf)
349     ///     }).await?;
350     ///
351     ///     Ok(())
352     /// }
353     /// ```
poll_peek( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<usize>>354     pub fn poll_peek(
355         &self,
356         cx: &mut Context<'_>,
357         buf: &mut ReadBuf<'_>,
358     ) -> Poll<io::Result<usize>> {
359         loop {
360             let ev = ready!(self.io.registration().poll_read_ready(cx))?;
361 
362             let b = unsafe {
363                 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
364             };
365 
366             match self.io.peek(b) {
367                 Ok(ret) => {
368                     unsafe { buf.assume_init(ret) };
369                     buf.advance(ret);
370                     return Poll::Ready(Ok(ret));
371                 }
372                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
373                     self.io.registration().clear_readiness(ev);
374                 }
375                 Err(e) => return Poll::Ready(Err(e)),
376             }
377         }
378     }
379 
380     /// Waits for any of the requested ready states.
381     ///
382     /// This function is usually paired with `try_read()` or `try_write()`. It
383     /// can be used to concurrently read / write to the same socket on a single
384     /// task without splitting the socket.
385     ///
386     /// The function may complete without the socket being ready. This is a
387     /// false-positive and attempting an operation will return with
388     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
389     /// [`Ready`] set, so you should always check the returned value and possibly
390     /// wait again if the requested states are not set.
391     ///
392     /// # Cancel safety
393     ///
394     /// This method is cancel safe. Once a readiness event occurs, the method
395     /// will continue to return immediately until the readiness event is
396     /// consumed by an attempt to read or write that fails with `WouldBlock` or
397     /// `Poll::Pending`.
398     ///
399     /// # Examples
400     ///
401     /// Concurrently read and write to the stream on the same task without
402     /// splitting.
403     ///
404     /// ```no_run
405     /// use tokio::io::Interest;
406     /// use tokio::net::TcpStream;
407     /// use std::error::Error;
408     /// use std::io;
409     ///
410     /// #[tokio::main]
411     /// async fn main() -> Result<(), Box<dyn Error>> {
412     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
413     ///
414     ///     loop {
415     ///         let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
416     ///
417     ///         if ready.is_readable() {
418     ///             let mut data = vec![0; 1024];
419     ///             // Try to read data, this may still fail with `WouldBlock`
420     ///             // if the readiness event is a false positive.
421     ///             match stream.try_read(&mut data) {
422     ///                 Ok(n) => {
423     ///                     println!("read {} bytes", n);
424     ///                 }
425     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
426     ///                     continue;
427     ///                 }
428     ///                 Err(e) => {
429     ///                     return Err(e.into());
430     ///                 }
431     ///             }
432     ///
433     ///         }
434     ///
435     ///         if ready.is_writable() {
436     ///             // Try to write data, this may still fail with `WouldBlock`
437     ///             // if the readiness event is a false positive.
438     ///             match stream.try_write(b"hello world") {
439     ///                 Ok(n) => {
440     ///                     println!("write {} bytes", n);
441     ///                 }
442     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
443     ///                     continue
444     ///                 }
445     ///                 Err(e) => {
446     ///                     return Err(e.into());
447     ///                 }
448     ///             }
449     ///         }
450     ///     }
451     /// }
452     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>453     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
454         let event = self.io.registration().readiness(interest).await?;
455         Ok(event.ready)
456     }
457 
458     /// Waits for the socket to become readable.
459     ///
460     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
461     /// paired with `try_read()`.
462     ///
463     /// # Cancel safety
464     ///
465     /// This method is cancel safe. Once a readiness event occurs, the method
466     /// will continue to return immediately until the readiness event is
467     /// consumed by an attempt to read that fails with `WouldBlock` or
468     /// `Poll::Pending`.
469     ///
470     /// # Examples
471     ///
472     /// ```no_run
473     /// use tokio::net::TcpStream;
474     /// use std::error::Error;
475     /// use std::io;
476     ///
477     /// #[tokio::main]
478     /// async fn main() -> Result<(), Box<dyn Error>> {
479     ///     // Connect to a peer
480     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
481     ///
482     ///     let mut msg = vec![0; 1024];
483     ///
484     ///     loop {
485     ///         // Wait for the socket to be readable
486     ///         stream.readable().await?;
487     ///
488     ///         // Try to read data, this may still fail with `WouldBlock`
489     ///         // if the readiness event is a false positive.
490     ///         match stream.try_read(&mut msg) {
491     ///             Ok(n) => {
492     ///                 msg.truncate(n);
493     ///                 break;
494     ///             }
495     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
496     ///                 continue;
497     ///             }
498     ///             Err(e) => {
499     ///                 return Err(e.into());
500     ///             }
501     ///         }
502     ///     }
503     ///
504     ///     println!("GOT = {:?}", msg);
505     ///     Ok(())
506     /// }
507     /// ```
readable(&self) -> io::Result<()>508     pub async fn readable(&self) -> io::Result<()> {
509         self.ready(Interest::READABLE).await?;
510         Ok(())
511     }
512 
513     /// Polls for read readiness.
514     ///
515     /// If the tcp stream is not currently ready for reading, this method will
516     /// store a clone of the `Waker` from the provided `Context`. When the tcp
517     /// stream becomes ready for reading, `Waker::wake` will be called on the
518     /// waker.
519     ///
520     /// Note that on multiple calls to `poll_read_ready`, `poll_read` or
521     /// `poll_peek`, only the `Waker` from the `Context` passed to the most
522     /// recent call is scheduled to receive a wakeup. (However,
523     /// `poll_write_ready` retains a second, independent waker.)
524     ///
525     /// This function is intended for cases where creating and pinning a future
526     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
527     /// preferred, as this supports polling from multiple tasks at once.
528     ///
529     /// # Return value
530     ///
531     /// The function returns:
532     ///
533     /// * `Poll::Pending` if the tcp stream is not ready for reading.
534     /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading.
535     /// * `Poll::Ready(Err(e))` if an error is encountered.
536     ///
537     /// # Errors
538     ///
539     /// This function may encounter any standard I/O error except `WouldBlock`.
540     ///
541     /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>542     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
543         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
544     }
545 
546     /// Tries to read data from the stream into the provided buffer, returning how
547     /// many bytes were read.
548     ///
549     /// Receives any pending data from the socket but does not wait for new data
550     /// to arrive. On success, returns the number of bytes read. Because
551     /// `try_read()` is non-blocking, the buffer does not have to be stored by
552     /// the async task and can exist entirely on the stack.
553     ///
554     /// Usually, [`readable()`] or [`ready()`] is used with this function.
555     ///
556     /// [`readable()`]: TcpStream::readable()
557     /// [`ready()`]: TcpStream::ready()
558     ///
559     /// # Return
560     ///
561     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
562     /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
563     ///
564     /// 1. The stream's read half is closed and will no longer yield data.
565     /// 2. The specified buffer was 0 bytes in length.
566     ///
567     /// If the stream is not ready to read data,
568     /// `Err(io::ErrorKind::WouldBlock)` is returned.
569     ///
570     /// # Examples
571     ///
572     /// ```no_run
573     /// use tokio::net::TcpStream;
574     /// use std::error::Error;
575     /// use std::io;
576     ///
577     /// #[tokio::main]
578     /// async fn main() -> Result<(), Box<dyn Error>> {
579     ///     // Connect to a peer
580     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
581     ///
582     ///     loop {
583     ///         // Wait for the socket to be readable
584     ///         stream.readable().await?;
585     ///
586     ///         // Creating the buffer **after** the `await` prevents it from
587     ///         // being stored in the async task.
588     ///         let mut buf = [0; 4096];
589     ///
590     ///         // Try to read data, this may still fail with `WouldBlock`
591     ///         // if the readiness event is a false positive.
592     ///         match stream.try_read(&mut buf) {
593     ///             Ok(0) => break,
594     ///             Ok(n) => {
595     ///                 println!("read {} bytes", n);
596     ///             }
597     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
598     ///                 continue;
599     ///             }
600     ///             Err(e) => {
601     ///                 return Err(e.into());
602     ///             }
603     ///         }
604     ///     }
605     ///
606     ///     Ok(())
607     /// }
608     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>609     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
610         use std::io::Read;
611 
612         self.io
613             .registration()
614             .try_io(Interest::READABLE, || (&*self.io).read(buf))
615     }
616 
617     /// Tries to read data from the stream into the provided buffers, returning
618     /// how many bytes were read.
619     ///
620     /// Data is copied to fill each buffer in order, with the final buffer
621     /// written to possibly being only partially filled. This method behaves
622     /// equivalently to a single call to [`try_read()`] with concatenated
623     /// buffers.
624     ///
625     /// Receives any pending data from the socket but does not wait for new data
626     /// to arrive. On success, returns the number of bytes read. Because
627     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
628     /// stored by the async task and can exist entirely on the stack.
629     ///
630     /// Usually, [`readable()`] or [`ready()`] is used with this function.
631     ///
632     /// [`try_read()`]: TcpStream::try_read()
633     /// [`readable()`]: TcpStream::readable()
634     /// [`ready()`]: TcpStream::ready()
635     ///
636     /// # Return
637     ///
638     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
639     /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
640     /// and will no longer yield data. If the stream is not ready to read data
641     /// `Err(io::ErrorKind::WouldBlock)` is returned.
642     ///
643     /// # Examples
644     ///
645     /// ```no_run
646     /// use tokio::net::TcpStream;
647     /// use std::error::Error;
648     /// use std::io::{self, IoSliceMut};
649     ///
650     /// #[tokio::main]
651     /// async fn main() -> Result<(), Box<dyn Error>> {
652     ///     // Connect to a peer
653     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
654     ///
655     ///     loop {
656     ///         // Wait for the socket to be readable
657     ///         stream.readable().await?;
658     ///
659     ///         // Creating the buffer **after** the `await` prevents it from
660     ///         // being stored in the async task.
661     ///         let mut buf_a = [0; 512];
662     ///         let mut buf_b = [0; 1024];
663     ///         let mut bufs = [
664     ///             IoSliceMut::new(&mut buf_a),
665     ///             IoSliceMut::new(&mut buf_b),
666     ///         ];
667     ///
668     ///         // Try to read data, this may still fail with `WouldBlock`
669     ///         // if the readiness event is a false positive.
670     ///         match stream.try_read_vectored(&mut bufs) {
671     ///             Ok(0) => break,
672     ///             Ok(n) => {
673     ///                 println!("read {} bytes", n);
674     ///             }
675     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
676     ///                 continue;
677     ///             }
678     ///             Err(e) => {
679     ///                 return Err(e.into());
680     ///             }
681     ///         }
682     ///     }
683     ///
684     ///     Ok(())
685     /// }
686     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>687     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
688         use std::io::Read;
689 
690         self.io
691             .registration()
692             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
693     }
694 
695     cfg_io_util! {
696         /// Tries to read data from the stream into the provided buffer, advancing the
697         /// buffer's internal cursor, returning how many bytes were read.
698         ///
699         /// Receives any pending data from the socket but does not wait for new data
700         /// to arrive. On success, returns the number of bytes read. Because
701         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
702         /// the async task and can exist entirely on the stack.
703         ///
704         /// Usually, [`readable()`] or [`ready()`] is used with this function.
705         ///
706         /// [`readable()`]: TcpStream::readable()
707         /// [`ready()`]: TcpStream::ready()
708         ///
709         /// # Return
710         ///
711         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
712         /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
713         /// and will no longer yield data. If the stream is not ready to read data
714         /// `Err(io::ErrorKind::WouldBlock)` is returned.
715         ///
716         /// # Examples
717         ///
718         /// ```no_run
719         /// use tokio::net::TcpStream;
720         /// use std::error::Error;
721         /// use std::io;
722         ///
723         /// #[tokio::main]
724         /// async fn main() -> Result<(), Box<dyn Error>> {
725         ///     // Connect to a peer
726         ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
727         ///
728         ///     loop {
729         ///         // Wait for the socket to be readable
730         ///         stream.readable().await?;
731         ///
732         ///         let mut buf = Vec::with_capacity(4096);
733         ///
734         ///         // Try to read data, this may still fail with `WouldBlock`
735         ///         // if the readiness event is a false positive.
736         ///         match stream.try_read_buf(&mut buf) {
737         ///             Ok(0) => break,
738         ///             Ok(n) => {
739         ///                 println!("read {} bytes", n);
740         ///             }
741         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
742         ///                 continue;
743         ///             }
744         ///             Err(e) => {
745         ///                 return Err(e.into());
746         ///             }
747         ///         }
748         ///     }
749         ///
750         ///     Ok(())
751         /// }
752         /// ```
753         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
754             self.io.registration().try_io(Interest::READABLE, || {
755                 use std::io::Read;
756 
757                 let dst = buf.chunk_mut();
758                 let dst =
759                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
760 
761                 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
762                 // buffer.
763                 let n = (&*self.io).read(dst)?;
764 
765                 unsafe {
766                     buf.advance_mut(n);
767                 }
768 
769                 Ok(n)
770             })
771         }
772     }
773 
774     /// Waits for the socket to become writable.
775     ///
776     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
777     /// paired with `try_write()`.
778     ///
779     /// # Cancel safety
780     ///
781     /// This method is cancel safe. Once a readiness event occurs, the method
782     /// will continue to return immediately until the readiness event is
783     /// consumed by an attempt to write that fails with `WouldBlock` or
784     /// `Poll::Pending`.
785     ///
786     /// # Examples
787     ///
788     /// ```no_run
789     /// use tokio::net::TcpStream;
790     /// use std::error::Error;
791     /// use std::io;
792     ///
793     /// #[tokio::main]
794     /// async fn main() -> Result<(), Box<dyn Error>> {
795     ///     // Connect to a peer
796     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
797     ///
798     ///     loop {
799     ///         // Wait for the socket to be writable
800     ///         stream.writable().await?;
801     ///
802     ///         // Try to write data, this may still fail with `WouldBlock`
803     ///         // if the readiness event is a false positive.
804     ///         match stream.try_write(b"hello world") {
805     ///             Ok(n) => {
806     ///                 break;
807     ///             }
808     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
809     ///                 continue;
810     ///             }
811     ///             Err(e) => {
812     ///                 return Err(e.into());
813     ///             }
814     ///         }
815     ///     }
816     ///
817     ///     Ok(())
818     /// }
819     /// ```
writable(&self) -> io::Result<()>820     pub async fn writable(&self) -> io::Result<()> {
821         self.ready(Interest::WRITABLE).await?;
822         Ok(())
823     }
824 
825     /// Polls for write readiness.
826     ///
827     /// If the tcp stream is not currently ready for writing, this method will
828     /// store a clone of the `Waker` from the provided `Context`. When the tcp
829     /// stream becomes ready for writing, `Waker::wake` will be called on the
830     /// waker.
831     ///
832     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
833     /// the `Waker` from the `Context` passed to the most recent call is
834     /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
835     /// second, independent waker.)
836     ///
837     /// This function is intended for cases where creating and pinning a future
838     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
839     /// preferred, as this supports polling from multiple tasks at once.
840     ///
841     /// # Return value
842     ///
843     /// The function returns:
844     ///
845     /// * `Poll::Pending` if the tcp stream is not ready for writing.
846     /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing.
847     /// * `Poll::Ready(Err(e))` if an error is encountered.
848     ///
849     /// # Errors
850     ///
851     /// This function may encounter any standard I/O error except `WouldBlock`.
852     ///
853     /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>854     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
855         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
856     }
857 
858     /// Try to write a buffer to the stream, returning how many bytes were
859     /// written.
860     ///
861     /// The function will attempt to write the entire contents of `buf`, but
862     /// only part of the buffer may be written.
863     ///
864     /// This function is usually paired with `writable()`.
865     ///
866     /// # Return
867     ///
868     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
869     /// number of bytes written. If the stream is not ready to write data,
870     /// `Err(io::ErrorKind::WouldBlock)` is returned.
871     ///
872     /// # Examples
873     ///
874     /// ```no_run
875     /// use tokio::net::TcpStream;
876     /// use std::error::Error;
877     /// use std::io;
878     ///
879     /// #[tokio::main]
880     /// async fn main() -> Result<(), Box<dyn Error>> {
881     ///     // Connect to a peer
882     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
883     ///
884     ///     loop {
885     ///         // Wait for the socket to be writable
886     ///         stream.writable().await?;
887     ///
888     ///         // Try to write data, this may still fail with `WouldBlock`
889     ///         // if the readiness event is a false positive.
890     ///         match stream.try_write(b"hello world") {
891     ///             Ok(n) => {
892     ///                 break;
893     ///             }
894     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
895     ///                 continue;
896     ///             }
897     ///             Err(e) => {
898     ///                 return Err(e.into());
899     ///             }
900     ///         }
901     ///     }
902     ///
903     ///     Ok(())
904     /// }
905     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>906     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
907         use std::io::Write;
908 
909         self.io
910             .registration()
911             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
912     }
913 
914     /// Tries to write several buffers to the stream, returning how many bytes
915     /// were written.
916     ///
917     /// Data is written from each buffer in order, with the final buffer read
918     /// from possible being only partially consumed. This method behaves
919     /// equivalently to a single call to [`try_write()`] with concatenated
920     /// buffers.
921     ///
922     /// This function is usually paired with `writable()`.
923     ///
924     /// [`try_write()`]: TcpStream::try_write()
925     ///
926     /// # Return
927     ///
928     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
929     /// number of bytes written. If the stream is not ready to write data,
930     /// `Err(io::ErrorKind::WouldBlock)` is returned.
931     ///
932     /// # Examples
933     ///
934     /// ```no_run
935     /// use tokio::net::TcpStream;
936     /// use std::error::Error;
937     /// use std::io;
938     ///
939     /// #[tokio::main]
940     /// async fn main() -> Result<(), Box<dyn Error>> {
941     ///     // Connect to a peer
942     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
943     ///
944     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
945     ///
946     ///     loop {
947     ///         // Wait for the socket to be writable
948     ///         stream.writable().await?;
949     ///
950     ///         // Try to write data, this may still fail with `WouldBlock`
951     ///         // if the readiness event is a false positive.
952     ///         match stream.try_write_vectored(&bufs) {
953     ///             Ok(n) => {
954     ///                 break;
955     ///             }
956     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
957     ///                 continue;
958     ///             }
959     ///             Err(e) => {
960     ///                 return Err(e.into());
961     ///             }
962     ///         }
963     ///     }
964     ///
965     ///     Ok(())
966     /// }
967     /// ```
try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize>968     pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
969         use std::io::Write;
970 
971         self.io
972             .registration()
973             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
974     }
975 
976     /// Tries to read or write from the socket using a user-provided IO operation.
977     ///
978     /// If the socket is ready, the provided closure is called. The closure
979     /// should attempt to perform IO operation on the socket by manually
980     /// calling the appropriate syscall. If the operation fails because the
981     /// socket is not actually ready, then the closure should return a
982     /// `WouldBlock` error and the readiness flag is cleared. The return value
983     /// of the closure is then returned by `try_io`.
984     ///
985     /// If the socket is not ready, then the closure is not called
986     /// and a `WouldBlock` error is returned.
987     ///
988     /// The closure should only return a `WouldBlock` error if it has performed
989     /// an IO operation on the socket that failed due to the socket not being
990     /// ready. Returning a `WouldBlock` error in any other situation will
991     /// incorrectly clear the readiness flag, which can cause the socket to
992     /// behave incorrectly.
993     ///
994     /// The closure should not perform the IO operation using any of the methods
995     /// defined on the Tokio `TcpStream` type, as this will mess with the
996     /// readiness flag and can cause the socket to behave incorrectly.
997     ///
998     /// This method is not intended to be used with combined interests.
999     /// The closure should perform only one type of IO operation, so it should not
1000     /// require more than one ready state. This method may panic or sleep forever
1001     /// if it is called with a combined interest.
1002     ///
1003     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
1004     ///
1005     /// [`readable()`]: TcpStream::readable()
1006     /// [`writable()`]: TcpStream::writable()
1007     /// [`ready()`]: TcpStream::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>1008     pub fn try_io<R>(
1009         &self,
1010         interest: Interest,
1011         f: impl FnOnce() -> io::Result<R>,
1012     ) -> io::Result<R> {
1013         self.io
1014             .registration()
1015             .try_io(interest, || self.io.try_io(f))
1016     }
1017 
1018     /// Reads or writes from the socket using a user-provided IO operation.
1019     ///
1020     /// The readiness of the socket is awaited and when the socket is ready,
1021     /// the provided closure is called. The closure should attempt to perform
1022     /// IO operation on the socket by manually calling the appropriate syscall.
1023     /// If the operation fails because the socket is not actually ready,
1024     /// then the closure should return a `WouldBlock` error. In such case the
1025     /// readiness flag is cleared and the socket readiness is awaited again.
1026     /// This loop is repeated until the closure returns an `Ok` or an error
1027     /// other than `WouldBlock`.
1028     ///
1029     /// The closure should only return a `WouldBlock` error if it has performed
1030     /// an IO operation on the socket that failed due to the socket not being
1031     /// ready. Returning a `WouldBlock` error in any other situation will
1032     /// incorrectly clear the readiness flag, which can cause the socket to
1033     /// behave incorrectly.
1034     ///
1035     /// The closure should not perform the IO operation using any of the methods
1036     /// defined on the Tokio `TcpStream` type, as this will mess with the
1037     /// readiness flag and can cause the socket to behave incorrectly.
1038     ///
1039     /// This method is not intended to be used with combined interests.
1040     /// The closure should perform only one type of IO operation, so it should not
1041     /// require more than one ready state. This method may panic or sleep forever
1042     /// if it is called with a combined interest.
async_io<R>( &self, interest: Interest, mut f: impl FnMut() -> io::Result<R>, ) -> io::Result<R>1043     pub async fn async_io<R>(
1044         &self,
1045         interest: Interest,
1046         mut f: impl FnMut() -> io::Result<R>,
1047     ) -> io::Result<R> {
1048         self.io
1049             .registration()
1050             .async_io(interest, || self.io.try_io(&mut f))
1051             .await
1052     }
1053 
1054     /// Receives data on the socket from the remote address to which it is
1055     /// connected, without removing that data from the queue. On success,
1056     /// returns the number of bytes peeked.
1057     ///
1058     /// Successive calls return the same data. This is accomplished by passing
1059     /// `MSG_PEEK` as a flag to the underlying recv system call.
1060     ///
1061     /// # Examples
1062     ///
1063     /// ```no_run
1064     /// use tokio::net::TcpStream;
1065     /// use tokio::io::AsyncReadExt;
1066     /// use std::error::Error;
1067     ///
1068     /// #[tokio::main]
1069     /// async fn main() -> Result<(), Box<dyn Error>> {
1070     ///     // Connect to a peer
1071     ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
1072     ///
1073     ///     let mut b1 = [0; 10];
1074     ///     let mut b2 = [0; 10];
1075     ///
1076     ///     // Peek at the data
1077     ///     let n = stream.peek(&mut b1).await?;
1078     ///
1079     ///     // Read the data
1080     ///     assert_eq!(n, stream.read(&mut b2[..n]).await?);
1081     ///     assert_eq!(&b1[..n], &b2[..n]);
1082     ///
1083     ///     Ok(())
1084     /// }
1085     /// ```
1086     ///
1087     /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
1088     ///
1089     /// [`read`]: fn@crate::io::AsyncReadExt::read
1090     /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
peek(&self, buf: &mut [u8]) -> io::Result<usize>1091     pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1092         self.io
1093             .registration()
1094             .async_io(Interest::READABLE, || self.io.peek(buf))
1095             .await
1096     }
1097 
1098     /// Shuts down the read, write, or both halves of this connection.
1099     ///
1100     /// This function will cause all pending and future I/O on the specified
1101     /// portions to return immediately with an appropriate value (see the
1102     /// documentation of `Shutdown`).
shutdown_std(&self, how: Shutdown) -> io::Result<()>1103     pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
1104         self.io.shutdown(how)
1105     }
1106 
1107     /// Gets the value of the `TCP_NODELAY` option on this socket.
1108     ///
1109     /// For more information about this option, see [`set_nodelay`].
1110     ///
1111     /// [`set_nodelay`]: TcpStream::set_nodelay
1112     ///
1113     /// # Examples
1114     ///
1115     /// ```no_run
1116     /// use tokio::net::TcpStream;
1117     ///
1118     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1119     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1120     ///
1121     /// println!("{:?}", stream.nodelay()?);
1122     /// # Ok(())
1123     /// # }
1124     /// ```
nodelay(&self) -> io::Result<bool>1125     pub fn nodelay(&self) -> io::Result<bool> {
1126         self.io.nodelay()
1127     }
1128 
1129     /// Sets the value of the `TCP_NODELAY` option on this socket.
1130     ///
1131     /// If set, this option disables the Nagle algorithm. This means that
1132     /// segments are always sent as soon as possible, even if there is only a
1133     /// small amount of data. When not set, data is buffered until there is a
1134     /// sufficient amount to send out, thereby avoiding the frequent sending of
1135     /// small packets.
1136     ///
1137     /// # Examples
1138     ///
1139     /// ```no_run
1140     /// use tokio::net::TcpStream;
1141     ///
1142     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1143     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1144     ///
1145     /// stream.set_nodelay(true)?;
1146     /// # Ok(())
1147     /// # }
1148     /// ```
set_nodelay(&self, nodelay: bool) -> io::Result<()>1149     pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
1150         self.io.set_nodelay(nodelay)
1151     }
1152 
1153     cfg_not_wasi! {
1154         /// Reads the linger duration for this socket by getting the `SO_LINGER`
1155         /// option.
1156         ///
1157         /// For more information about this option, see [`set_linger`].
1158         ///
1159         /// [`set_linger`]: TcpStream::set_linger
1160         ///
1161         /// # Examples
1162         ///
1163         /// ```no_run
1164         /// use tokio::net::TcpStream;
1165         ///
1166         /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1167         /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1168         ///
1169         /// println!("{:?}", stream.linger()?);
1170         /// # Ok(())
1171         /// # }
1172         /// ```
1173         pub fn linger(&self) -> io::Result<Option<Duration>> {
1174             socket2::SockRef::from(self).linger()
1175         }
1176 
1177         /// Sets the linger duration of this socket by setting the SO_LINGER option.
1178         ///
1179         /// This option controls the action taken when a stream has unsent messages and the stream is
1180         /// closed. If SO_LINGER is set, the system shall block the process until it can transmit the
1181         /// data or until the time expires.
1182         ///
1183         /// If SO_LINGER is not specified, and the stream is closed, the system handles the call in a
1184         /// way that allows the process to continue as quickly as possible.
1185         ///
1186         /// # Examples
1187         ///
1188         /// ```no_run
1189         /// use tokio::net::TcpStream;
1190         ///
1191         /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1192         /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1193         ///
1194         /// stream.set_linger(None)?;
1195         /// # Ok(())
1196         /// # }
1197         /// ```
1198         pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
1199             socket2::SockRef::from(self).set_linger(dur)
1200         }
1201     }
1202 
1203     /// Gets the value of the `IP_TTL` option for this socket.
1204     ///
1205     /// For more information about this option, see [`set_ttl`].
1206     ///
1207     /// [`set_ttl`]: TcpStream::set_ttl
1208     ///
1209     /// # Examples
1210     ///
1211     /// ```no_run
1212     /// use tokio::net::TcpStream;
1213     ///
1214     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1215     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1216     ///
1217     /// println!("{:?}", stream.ttl()?);
1218     /// # Ok(())
1219     /// # }
1220     /// ```
ttl(&self) -> io::Result<u32>1221     pub fn ttl(&self) -> io::Result<u32> {
1222         self.io.ttl()
1223     }
1224 
1225     /// Sets the value for the `IP_TTL` option on this socket.
1226     ///
1227     /// This value sets the time-to-live field that is used in every packet sent
1228     /// from this socket.
1229     ///
1230     /// # Examples
1231     ///
1232     /// ```no_run
1233     /// use tokio::net::TcpStream;
1234     ///
1235     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1236     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1237     ///
1238     /// stream.set_ttl(123)?;
1239     /// # Ok(())
1240     /// # }
1241     /// ```
set_ttl(&self, ttl: u32) -> io::Result<()>1242     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
1243         self.io.set_ttl(ttl)
1244     }
1245 
1246     // These lifetime markers also appear in the generated documentation, and make
1247     // it more clear that this is a *borrowed* split.
1248     #[allow(clippy::needless_lifetimes)]
1249     /// Splits a `TcpStream` into a read half and a write half, which can be used
1250     /// to read and write the stream concurrently.
1251     ///
1252     /// This method is more efficient than [`into_split`], but the halves cannot be
1253     /// moved into independently spawned tasks.
1254     ///
1255     /// [`into_split`]: TcpStream::into_split()
split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>)1256     pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
1257         split(self)
1258     }
1259 
1260     /// Splits a `TcpStream` into a read half and a write half, which can be used
1261     /// to read and write the stream concurrently.
1262     ///
1263     /// Unlike [`split`], the owned halves can be moved to separate tasks, however
1264     /// this comes at the cost of a heap allocation.
1265     ///
1266     /// **Note:** Dropping the write half will shut down the write half of the TCP
1267     /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`.
1268     ///
1269     /// [`split`]: TcpStream::split()
1270     /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
into_split(self) -> (OwnedReadHalf, OwnedWriteHalf)1271     pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
1272         split_owned(self)
1273     }
1274 
1275     // == Poll IO functions that takes `&self` ==
1276     //
1277     // To read or write without mutable access to the `UnixStream`, combine the
1278     // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1279     // `try_write` methods.
1280 
poll_read_priv( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1281     pub(crate) fn poll_read_priv(
1282         &self,
1283         cx: &mut Context<'_>,
1284         buf: &mut ReadBuf<'_>,
1285     ) -> Poll<io::Result<()>> {
1286         // Safety: `TcpStream::read` correctly handles reads into uninitialized memory
1287         unsafe { self.io.poll_read(cx, buf) }
1288     }
1289 
poll_write_priv( &self, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1290     pub(super) fn poll_write_priv(
1291         &self,
1292         cx: &mut Context<'_>,
1293         buf: &[u8],
1294     ) -> Poll<io::Result<usize>> {
1295         self.io.poll_write(cx, buf)
1296     }
1297 
poll_write_vectored_priv( &self, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1298     pub(super) fn poll_write_vectored_priv(
1299         &self,
1300         cx: &mut Context<'_>,
1301         bufs: &[io::IoSlice<'_>],
1302     ) -> Poll<io::Result<usize>> {
1303         self.io.poll_write_vectored(cx, bufs)
1304     }
1305 }
1306 
1307 impl TryFrom<std::net::TcpStream> for TcpStream {
1308     type Error = io::Error;
1309 
1310     /// Consumes stream, returning the tokio I/O object.
1311     ///
1312     /// This is equivalent to
1313     /// [`TcpStream::from_std(stream)`](TcpStream::from_std).
try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error>1314     fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> {
1315         Self::from_std(stream)
1316     }
1317 }
1318 
1319 // ===== impl Read / Write =====
1320 
1321 impl AsyncRead for TcpStream {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1322     fn poll_read(
1323         self: Pin<&mut Self>,
1324         cx: &mut Context<'_>,
1325         buf: &mut ReadBuf<'_>,
1326     ) -> Poll<io::Result<()>> {
1327         self.poll_read_priv(cx, buf)
1328     }
1329 }
1330 
1331 impl AsyncWrite for TcpStream {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1332     fn poll_write(
1333         self: Pin<&mut Self>,
1334         cx: &mut Context<'_>,
1335         buf: &[u8],
1336     ) -> Poll<io::Result<usize>> {
1337         self.poll_write_priv(cx, buf)
1338     }
1339 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1340     fn poll_write_vectored(
1341         self: Pin<&mut Self>,
1342         cx: &mut Context<'_>,
1343         bufs: &[io::IoSlice<'_>],
1344     ) -> Poll<io::Result<usize>> {
1345         self.poll_write_vectored_priv(cx, bufs)
1346     }
1347 
is_write_vectored(&self) -> bool1348     fn is_write_vectored(&self) -> bool {
1349         true
1350     }
1351 
1352     #[inline]
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1353     fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1354         // tcp flush is a no-op
1355         Poll::Ready(Ok(()))
1356     }
1357 
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1358     fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1359         self.shutdown_std(std::net::Shutdown::Write)?;
1360         Poll::Ready(Ok(()))
1361     }
1362 }
1363 
1364 impl fmt::Debug for TcpStream {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1365     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1366         self.io.fmt(f)
1367     }
1368 }
1369 
1370 #[cfg(unix)]
1371 mod sys {
1372     use super::TcpStream;
1373     use std::os::unix::prelude::*;
1374 
1375     impl AsRawFd for TcpStream {
as_raw_fd(&self) -> RawFd1376         fn as_raw_fd(&self) -> RawFd {
1377             self.io.as_raw_fd()
1378         }
1379     }
1380 
1381     impl AsFd for TcpStream {
as_fd(&self) -> BorrowedFd<'_>1382         fn as_fd(&self) -> BorrowedFd<'_> {
1383             unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1384         }
1385     }
1386 }
1387 
1388 cfg_windows! {
1389     use crate::os::windows::io::{AsRawSocket, RawSocket, AsSocket, BorrowedSocket};
1390 
1391     impl AsRawSocket for TcpStream {
1392         fn as_raw_socket(&self) -> RawSocket {
1393             self.io.as_raw_socket()
1394         }
1395     }
1396 
1397     impl AsSocket for TcpStream {
1398         fn as_socket(&self) -> BorrowedSocket<'_> {
1399             unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
1400         }
1401     }
1402 }
1403 
1404 #[cfg(all(tokio_unstable, target_os = "wasi"))]
1405 mod sys {
1406     use super::TcpStream;
1407     use std::os::wasi::prelude::*;
1408 
1409     impl AsRawFd for TcpStream {
as_raw_fd(&self) -> RawFd1410         fn as_raw_fd(&self) -> RawFd {
1411             self.io.as_raw_fd()
1412         }
1413     }
1414 
1415     impl AsFd for TcpStream {
as_fd(&self) -> BorrowedFd<'_>1416         fn as_fd(&self) -> BorrowedFd<'_> {
1417             unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1418         }
1419     }
1420 }
1421