• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::future::poll_fn;
2 use crate::io::{AsyncRead, AsyncWrite, Interest, PollEvented, ReadBuf, Ready};
3 use crate::net::tcp::split::{split, ReadHalf, WriteHalf};
4 use crate::net::tcp::split_owned::{split_owned, OwnedReadHalf, OwnedWriteHalf};
5 use crate::net::{to_socket_addrs, ToSocketAddrs};
6 
7 use std::convert::TryFrom;
8 use std::fmt;
9 use std::io;
10 use std::net::{Shutdown, SocketAddr};
11 use std::pin::Pin;
12 use std::task::{Context, Poll};
13 use std::time::Duration;
14 
15 cfg_io_util! {
16     use bytes::BufMut;
17 }
18 
19 cfg_net! {
20     /// A TCP stream between a local and a remote socket.
21     ///
22     /// A TCP stream can either be created by connecting to an endpoint, via the
23     /// [`connect`] method, or by [accepting] a connection from a [listener]. A
24     /// TCP stream can also be created via the [`TcpSocket`] type.
25     ///
26     /// Reading and writing to a `TcpStream` is usually done using the
27     /// convenience methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`]
28     /// traits.
29     ///
30     /// [`connect`]: method@TcpStream::connect
31     /// [accepting]: method@crate::net::TcpListener::accept
32     /// [listener]: struct@crate::net::TcpListener
33     /// [`TcpSocket`]: struct@crate::net::TcpSocket
34     /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
35     /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
36     ///
37     /// # Examples
38     ///
39     /// ```no_run
40     /// use tokio::net::TcpStream;
41     /// use tokio::io::AsyncWriteExt;
42     /// use std::error::Error;
43     ///
44     /// #[tokio::main]
45     /// async fn main() -> Result<(), Box<dyn Error>> {
46     ///     // Connect to a peer
47     ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
48     ///
49     ///     // Write some data.
50     ///     stream.write_all(b"hello world!").await?;
51     ///
52     ///     Ok(())
53     /// }
54     /// ```
55     ///
56     /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
57     ///
58     /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
59     /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
60     ///
61     /// To shut down the stream in the write direction, you can call the
62     /// [`shutdown()`] method. This will cause the other peer to receive a read of
63     /// length 0, indicating that no more data will be sent. This only closes
64     /// the stream in one direction.
65     ///
66     /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
67     pub struct TcpStream {
68         io: PollEvented<mio::net::TcpStream>,
69     }
70 }
71 
72 impl TcpStream {
73     /// Opens a TCP connection to a remote host.
74     ///
75     /// `addr` is an address of the remote host. Anything which implements the
76     /// [`ToSocketAddrs`] trait can be supplied as the address.  If `addr`
77     /// yields multiple addresses, connect will be attempted with each of the
78     /// addresses until a connection is successful. If none of the addresses
79     /// result in a successful connection, the error returned from the last
80     /// connection attempt (the last address) is returned.
81     ///
82     /// To configure the socket before connecting, you can use the [`TcpSocket`]
83     /// type.
84     ///
85     /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs
86     /// [`TcpSocket`]: struct@crate::net::TcpSocket
87     ///
88     /// # Examples
89     ///
90     /// ```no_run
91     /// use tokio::net::TcpStream;
92     /// use tokio::io::AsyncWriteExt;
93     /// use std::error::Error;
94     ///
95     /// #[tokio::main]
96     /// async fn main() -> Result<(), Box<dyn Error>> {
97     ///     // Connect to a peer
98     ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
99     ///
100     ///     // Write some data.
101     ///     stream.write_all(b"hello world!").await?;
102     ///
103     ///     Ok(())
104     /// }
105     /// ```
106     ///
107     /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
108     ///
109     /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
110     /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream>111     pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
112         let addrs = to_socket_addrs(addr).await?;
113 
114         let mut last_err = None;
115 
116         for addr in addrs {
117             match TcpStream::connect_addr(addr).await {
118                 Ok(stream) => return Ok(stream),
119                 Err(e) => last_err = Some(e),
120             }
121         }
122 
123         Err(last_err.unwrap_or_else(|| {
124             io::Error::new(
125                 io::ErrorKind::InvalidInput,
126                 "could not resolve to any address",
127             )
128         }))
129     }
130 
131     /// Establishes a connection to the specified `addr`.
connect_addr(addr: SocketAddr) -> io::Result<TcpStream>132     async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
133         let sys = mio::net::TcpStream::connect(addr)?;
134         TcpStream::connect_mio(sys).await
135     }
136 
connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream>137     pub(crate) async fn connect_mio(sys: mio::net::TcpStream) -> io::Result<TcpStream> {
138         let stream = TcpStream::new(sys)?;
139 
140         // Once we've connected, wait for the stream to be writable as
141         // that's when the actual connection has been initiated. Once we're
142         // writable we check for `take_socket_error` to see if the connect
143         // actually hit an error or not.
144         //
145         // If all that succeeded then we ship everything on up.
146         poll_fn(|cx| stream.io.registration().poll_write_ready(cx)).await?;
147 
148         if let Some(e) = stream.io.take_error()? {
149             return Err(e);
150         }
151 
152         Ok(stream)
153     }
154 
new(connected: mio::net::TcpStream) -> io::Result<TcpStream>155     pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
156         let io = PollEvented::new(connected)?;
157         Ok(TcpStream { io })
158     }
159 
160     /// Creates new `TcpStream` from a `std::net::TcpStream`.
161     ///
162     /// This function is intended to be used to wrap a TCP stream from the
163     /// standard library in the Tokio equivalent. The conversion assumes nothing
164     /// about the underlying stream; it is left up to the user to set it in
165     /// non-blocking mode.
166     ///
167     /// # Examples
168     ///
169     /// ```rust,no_run
170     /// use std::error::Error;
171     /// use tokio::net::TcpStream;
172     ///
173     /// #[tokio::main]
174     /// async fn main() -> Result<(), Box<dyn Error>> {
175     ///     let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
176     ///     std_stream.set_nonblocking(true)?;
177     ///     let stream = TcpStream::from_std(std_stream)?;
178     ///     Ok(())
179     /// }
180     /// ```
181     ///
182     /// # Panics
183     ///
184     /// This function panics if thread-local runtime is not set.
185     ///
186     /// The runtime is usually set implicitly when this function is called
187     /// from a future driven by a tokio runtime, otherwise runtime can be set
188     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_std(stream: std::net::TcpStream) -> io::Result<TcpStream>189     pub fn from_std(stream: std::net::TcpStream) -> io::Result<TcpStream> {
190         let io = mio::net::TcpStream::from_std(stream);
191         let io = PollEvented::new(io)?;
192         Ok(TcpStream { io })
193     }
194 
195     /// Turns a [`tokio::net::TcpStream`] into a [`std::net::TcpStream`].
196     ///
197     /// The returned [`std::net::TcpStream`] will have nonblocking mode set as `true`.
198     /// Use [`set_nonblocking`] to change the blocking mode if needed.
199     ///
200     /// # Examples
201     ///
202     /// ```
203     /// use std::error::Error;
204     /// use std::io::Read;
205     /// use tokio::net::TcpListener;
206     /// # use tokio::net::TcpStream;
207     /// # use tokio::io::AsyncWriteExt;
208     ///
209     /// #[tokio::main]
210     /// async fn main() -> Result<(), Box<dyn Error>> {
211     ///     let mut data = [0u8; 12];
212     ///     let listener = TcpListener::bind("127.0.0.1:34254").await?;
213     /// #   let handle = tokio::spawn(async {
214     /// #       let mut stream: TcpStream = TcpStream::connect("127.0.0.1:34254").await.unwrap();
215     /// #       stream.write(b"Hello world!").await.unwrap();
216     /// #   });
217     ///     let (tokio_tcp_stream, _) = listener.accept().await?;
218     ///     let mut std_tcp_stream = tokio_tcp_stream.into_std()?;
219     /// #   handle.await.expect("The task being joined has panicked");
220     ///     std_tcp_stream.set_nonblocking(false)?;
221     ///     std_tcp_stream.read_exact(&mut data)?;
222     /// #   assert_eq!(b"Hello world!", &data);
223     ///     Ok(())
224     /// }
225     /// ```
226     /// [`tokio::net::TcpStream`]: TcpStream
227     /// [`std::net::TcpStream`]: std::net::TcpStream
228     /// [`set_nonblocking`]: fn@std::net::TcpStream::set_nonblocking
into_std(self) -> io::Result<std::net::TcpStream>229     pub fn into_std(self) -> io::Result<std::net::TcpStream> {
230         #[cfg(unix)]
231         {
232             use std::os::unix::io::{FromRawFd, IntoRawFd};
233             self.io
234                 .into_inner()
235                 .map(|io| io.into_raw_fd())
236                 .map(|raw_fd| unsafe { std::net::TcpStream::from_raw_fd(raw_fd) })
237         }
238 
239         #[cfg(windows)]
240         {
241             use std::os::windows::io::{FromRawSocket, IntoRawSocket};
242             self.io
243                 .into_inner()
244                 .map(|io| io.into_raw_socket())
245                 .map(|raw_socket| unsafe { std::net::TcpStream::from_raw_socket(raw_socket) })
246         }
247     }
248 
249     /// Returns the local address that this stream is bound to.
250     ///
251     /// # Examples
252     ///
253     /// ```no_run
254     /// use tokio::net::TcpStream;
255     ///
256     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
257     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
258     ///
259     /// println!("{:?}", stream.local_addr()?);
260     /// # Ok(())
261     /// # }
262     /// ```
local_addr(&self) -> io::Result<SocketAddr>263     pub fn local_addr(&self) -> io::Result<SocketAddr> {
264         self.io.local_addr()
265     }
266 
267     /// Returns the remote address that this stream is connected to.
268     ///
269     /// # Examples
270     ///
271     /// ```no_run
272     /// use tokio::net::TcpStream;
273     ///
274     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
275     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
276     ///
277     /// println!("{:?}", stream.peer_addr()?);
278     /// # Ok(())
279     /// # }
280     /// ```
peer_addr(&self) -> io::Result<SocketAddr>281     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
282         self.io.peer_addr()
283     }
284 
285     /// Attempts to receive data on the socket, without removing that data from
286     /// the queue, registering the current task for wakeup if data is not yet
287     /// available.
288     ///
289     /// Note that on multiple calls to `poll_peek`, `poll_read` or
290     /// `poll_read_ready`, only the `Waker` from the `Context` passed to the
291     /// most recent call is scheduled to receive a wakeup. (However,
292     /// `poll_write` retains a second, independent waker.)
293     ///
294     /// # Return value
295     ///
296     /// The function returns:
297     ///
298     /// * `Poll::Pending` if data is not yet available.
299     /// * `Poll::Ready(Ok(n))` if data is available. `n` is the number of bytes peeked.
300     /// * `Poll::Ready(Err(e))` if an error is encountered.
301     ///
302     /// # Errors
303     ///
304     /// This function may encounter any standard I/O error except `WouldBlock`.
305     ///
306     /// # Examples
307     ///
308     /// ```no_run
309     /// use tokio::io::{self, ReadBuf};
310     /// use tokio::net::TcpStream;
311     ///
312     /// use futures::future::poll_fn;
313     ///
314     /// #[tokio::main]
315     /// async fn main() -> io::Result<()> {
316     ///     let stream = TcpStream::connect("127.0.0.1:8000").await?;
317     ///     let mut buf = [0; 10];
318     ///     let mut buf = ReadBuf::new(&mut buf);
319     ///
320     ///     poll_fn(|cx| {
321     ///         stream.poll_peek(cx, &mut buf)
322     ///     }).await?;
323     ///
324     ///     Ok(())
325     /// }
326     /// ```
poll_peek( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<usize>>327     pub fn poll_peek(
328         &self,
329         cx: &mut Context<'_>,
330         buf: &mut ReadBuf<'_>,
331     ) -> Poll<io::Result<usize>> {
332         loop {
333             let ev = ready!(self.io.registration().poll_read_ready(cx))?;
334 
335             let b = unsafe {
336                 &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8])
337             };
338 
339             match self.io.peek(b) {
340                 Ok(ret) => {
341                     unsafe { buf.assume_init(ret) };
342                     buf.advance(ret);
343                     return Poll::Ready(Ok(ret));
344                 }
345                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
346                     self.io.registration().clear_readiness(ev);
347                 }
348                 Err(e) => return Poll::Ready(Err(e)),
349             }
350         }
351     }
352 
353     /// Waits for any of the requested ready states.
354     ///
355     /// This function is usually paired with `try_read()` or `try_write()`. It
356     /// can be used to concurrently read / write to the same socket on a single
357     /// task without splitting the socket.
358     ///
359     /// # Cancel safety
360     ///
361     /// This method is cancel safe. Once a readiness event occurs, the method
362     /// will continue to return immediately until the readiness event is
363     /// consumed by an attempt to read or write that fails with `WouldBlock` or
364     /// `Poll::Pending`.
365     ///
366     /// # Examples
367     ///
368     /// Concurrently read and write to the stream on the same task without
369     /// splitting.
370     ///
371     /// ```no_run
372     /// use tokio::io::Interest;
373     /// use tokio::net::TcpStream;
374     /// use std::error::Error;
375     /// use std::io;
376     ///
377     /// #[tokio::main]
378     /// async fn main() -> Result<(), Box<dyn Error>> {
379     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
380     ///
381     ///     loop {
382     ///         let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await?;
383     ///
384     ///         if ready.is_readable() {
385     ///             let mut data = vec![0; 1024];
386     ///             // Try to read data, this may still fail with `WouldBlock`
387     ///             // if the readiness event is a false positive.
388     ///             match stream.try_read(&mut data) {
389     ///                 Ok(n) => {
390     ///                     println!("read {} bytes", n);
391     ///                 }
392     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
393     ///                     continue;
394     ///                 }
395     ///                 Err(e) => {
396     ///                     return Err(e.into());
397     ///                 }
398     ///             }
399     ///
400     ///         }
401     ///
402     ///         if ready.is_writable() {
403     ///             // Try to write data, this may still fail with `WouldBlock`
404     ///             // if the readiness event is a false positive.
405     ///             match stream.try_write(b"hello world") {
406     ///                 Ok(n) => {
407     ///                     println!("write {} bytes", n);
408     ///                 }
409     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
410     ///                     continue
411     ///                 }
412     ///                 Err(e) => {
413     ///                     return Err(e.into());
414     ///                 }
415     ///             }
416     ///         }
417     ///     }
418     /// }
419     /// ```
ready(&self, interest: Interest) -> io::Result<Ready>420     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
421         let event = self.io.registration().readiness(interest).await?;
422         Ok(event.ready)
423     }
424 
425     /// Waits for the socket to become readable.
426     ///
427     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
428     /// paired with `try_read()`.
429     ///
430     /// # Cancel safety
431     ///
432     /// This method is cancel safe. Once a readiness event occurs, the method
433     /// will continue to return immediately until the readiness event is
434     /// consumed by an attempt to read that fails with `WouldBlock` or
435     /// `Poll::Pending`.
436     ///
437     /// # Examples
438     ///
439     /// ```no_run
440     /// use tokio::net::TcpStream;
441     /// use std::error::Error;
442     /// use std::io;
443     ///
444     /// #[tokio::main]
445     /// async fn main() -> Result<(), Box<dyn Error>> {
446     ///     // Connect to a peer
447     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
448     ///
449     ///     let mut msg = vec![0; 1024];
450     ///
451     ///     loop {
452     ///         // Wait for the socket to be readable
453     ///         stream.readable().await?;
454     ///
455     ///         // Try to read data, this may still fail with `WouldBlock`
456     ///         // if the readiness event is a false positive.
457     ///         match stream.try_read(&mut msg) {
458     ///             Ok(n) => {
459     ///                 msg.truncate(n);
460     ///                 break;
461     ///             }
462     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
463     ///                 continue;
464     ///             }
465     ///             Err(e) => {
466     ///                 return Err(e.into());
467     ///             }
468     ///         }
469     ///     }
470     ///
471     ///     println!("GOT = {:?}", msg);
472     ///     Ok(())
473     /// }
474     /// ```
readable(&self) -> io::Result<()>475     pub async fn readable(&self) -> io::Result<()> {
476         self.ready(Interest::READABLE).await?;
477         Ok(())
478     }
479 
480     /// Polls for read readiness.
481     ///
482     /// If the tcp stream is not currently ready for reading, this method will
483     /// store a clone of the `Waker` from the provided `Context`. When the tcp
484     /// stream becomes ready for reading, `Waker::wake` will be called on the
485     /// waker.
486     ///
487     /// Note that on multiple calls to `poll_read_ready`, `poll_read` or
488     /// `poll_peek`, only the `Waker` from the `Context` passed to the most
489     /// recent call is scheduled to receive a wakeup. (However,
490     /// `poll_write_ready` retains a second, independent waker.)
491     ///
492     /// This function is intended for cases where creating and pinning a future
493     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
494     /// preferred, as this supports polling from multiple tasks at once.
495     ///
496     /// # Return value
497     ///
498     /// The function returns:
499     ///
500     /// * `Poll::Pending` if the tcp stream is not ready for reading.
501     /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for reading.
502     /// * `Poll::Ready(Err(e))` if an error is encountered.
503     ///
504     /// # Errors
505     ///
506     /// This function may encounter any standard I/O error except `WouldBlock`.
507     ///
508     /// [`readable`]: method@Self::readable
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>509     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
510         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
511     }
512 
513     /// Tries to read data from the stream into the provided buffer, returning how
514     /// many bytes were read.
515     ///
516     /// Receives any pending data from the socket but does not wait for new data
517     /// to arrive. On success, returns the number of bytes read. Because
518     /// `try_read()` is non-blocking, the buffer does not have to be stored by
519     /// the async task and can exist entirely on the stack.
520     ///
521     /// Usually, [`readable()`] or [`ready()`] is used with this function.
522     ///
523     /// [`readable()`]: TcpStream::readable()
524     /// [`ready()`]: TcpStream::ready()
525     ///
526     /// # Return
527     ///
528     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
529     /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
530     /// and will no longer yield data. If the stream is not ready to read data
531     /// `Err(io::ErrorKind::WouldBlock)` is returned.
532     ///
533     /// # Examples
534     ///
535     /// ```no_run
536     /// use tokio::net::TcpStream;
537     /// use std::error::Error;
538     /// use std::io;
539     ///
540     /// #[tokio::main]
541     /// async fn main() -> Result<(), Box<dyn Error>> {
542     ///     // Connect to a peer
543     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
544     ///
545     ///     loop {
546     ///         // Wait for the socket to be readable
547     ///         stream.readable().await?;
548     ///
549     ///         // Creating the buffer **after** the `await` prevents it from
550     ///         // being stored in the async task.
551     ///         let mut buf = [0; 4096];
552     ///
553     ///         // Try to read data, this may still fail with `WouldBlock`
554     ///         // if the readiness event is a false positive.
555     ///         match stream.try_read(&mut buf) {
556     ///             Ok(0) => break,
557     ///             Ok(n) => {
558     ///                 println!("read {} bytes", n);
559     ///             }
560     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
561     ///                 continue;
562     ///             }
563     ///             Err(e) => {
564     ///                 return Err(e.into());
565     ///             }
566     ///         }
567     ///     }
568     ///
569     ///     Ok(())
570     /// }
571     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>572     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
573         use std::io::Read;
574 
575         self.io
576             .registration()
577             .try_io(Interest::READABLE, || (&*self.io).read(buf))
578     }
579 
580     /// Tries to read data from the stream into the provided buffers, returning
581     /// how many bytes were read.
582     ///
583     /// Data is copied to fill each buffer in order, with the final buffer
584     /// written to possibly being only partially filled. This method behaves
585     /// equivalently to a single call to [`try_read()`] with concatenated
586     /// buffers.
587     ///
588     /// Receives any pending data from the socket but does not wait for new data
589     /// to arrive. On success, returns the number of bytes read. Because
590     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
591     /// stored by the async task and can exist entirely on the stack.
592     ///
593     /// Usually, [`readable()`] or [`ready()`] is used with this function.
594     ///
595     /// [`try_read()`]: TcpStream::try_read()
596     /// [`readable()`]: TcpStream::readable()
597     /// [`ready()`]: TcpStream::ready()
598     ///
599     /// # Return
600     ///
601     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
602     /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
603     /// and will no longer yield data. If the stream is not ready to read data
604     /// `Err(io::ErrorKind::WouldBlock)` is returned.
605     ///
606     /// # Examples
607     ///
608     /// ```no_run
609     /// use tokio::net::TcpStream;
610     /// use std::error::Error;
611     /// use std::io::{self, IoSliceMut};
612     ///
613     /// #[tokio::main]
614     /// async fn main() -> Result<(), Box<dyn Error>> {
615     ///     // Connect to a peer
616     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
617     ///
618     ///     loop {
619     ///         // Wait for the socket to be readable
620     ///         stream.readable().await?;
621     ///
622     ///         // Creating the buffer **after** the `await` prevents it from
623     ///         // being stored in the async task.
624     ///         let mut buf_a = [0; 512];
625     ///         let mut buf_b = [0; 1024];
626     ///         let mut bufs = [
627     ///             IoSliceMut::new(&mut buf_a),
628     ///             IoSliceMut::new(&mut buf_b),
629     ///         ];
630     ///
631     ///         // Try to read data, this may still fail with `WouldBlock`
632     ///         // if the readiness event is a false positive.
633     ///         match stream.try_read_vectored(&mut bufs) {
634     ///             Ok(0) => break,
635     ///             Ok(n) => {
636     ///                 println!("read {} bytes", n);
637     ///             }
638     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
639     ///                 continue;
640     ///             }
641     ///             Err(e) => {
642     ///                 return Err(e.into());
643     ///             }
644     ///         }
645     ///     }
646     ///
647     ///     Ok(())
648     /// }
649     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>650     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
651         use std::io::Read;
652 
653         self.io
654             .registration()
655             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
656     }
657 
658     cfg_io_util! {
659         /// Tries to read data from the stream into the provided buffer, advancing the
660         /// buffer's internal cursor, returning how many bytes were read.
661         ///
662         /// Receives any pending data from the socket but does not wait for new data
663         /// to arrive. On success, returns the number of bytes read. Because
664         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
665         /// the async task and can exist entirely on the stack.
666         ///
667         /// Usually, [`readable()`] or [`ready()`] is used with this function.
668         ///
669         /// [`readable()`]: TcpStream::readable()
670         /// [`ready()`]: TcpStream::ready()
671         ///
672         /// # Return
673         ///
674         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
675         /// number of bytes read. `Ok(0)` indicates the stream's read half is closed
676         /// and will no longer yield data. If the stream is not ready to read data
677         /// `Err(io::ErrorKind::WouldBlock)` is returned.
678         ///
679         /// # Examples
680         ///
681         /// ```no_run
682         /// use tokio::net::TcpStream;
683         /// use std::error::Error;
684         /// use std::io;
685         ///
686         /// #[tokio::main]
687         /// async fn main() -> Result<(), Box<dyn Error>> {
688         ///     // Connect to a peer
689         ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
690         ///
691         ///     loop {
692         ///         // Wait for the socket to be readable
693         ///         stream.readable().await?;
694         ///
695         ///         let mut buf = Vec::with_capacity(4096);
696         ///
697         ///         // Try to read data, this may still fail with `WouldBlock`
698         ///         // if the readiness event is a false positive.
699         ///         match stream.try_read_buf(&mut buf) {
700         ///             Ok(0) => break,
701         ///             Ok(n) => {
702         ///                 println!("read {} bytes", n);
703         ///             }
704         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
705         ///                 continue;
706         ///             }
707         ///             Err(e) => {
708         ///                 return Err(e.into());
709         ///             }
710         ///         }
711         ///     }
712         ///
713         ///     Ok(())
714         /// }
715         /// ```
716         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
717             self.io.registration().try_io(Interest::READABLE, || {
718                 use std::io::Read;
719 
720                 let dst = buf.chunk_mut();
721                 let dst =
722                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
723 
724                 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
725                 // buffer.
726                 let n = (&*self.io).read(dst)?;
727 
728                 unsafe {
729                     buf.advance_mut(n);
730                 }
731 
732                 Ok(n)
733             })
734         }
735     }
736 
737     /// Waits for the socket to become writable.
738     ///
739     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
740     /// paired with `try_write()`.
741     ///
742     /// # Cancel safety
743     ///
744     /// This method is cancel safe. Once a readiness event occurs, the method
745     /// will continue to return immediately until the readiness event is
746     /// consumed by an attempt to write that fails with `WouldBlock` or
747     /// `Poll::Pending`.
748     ///
749     /// # Examples
750     ///
751     /// ```no_run
752     /// use tokio::net::TcpStream;
753     /// use std::error::Error;
754     /// use std::io;
755     ///
756     /// #[tokio::main]
757     /// async fn main() -> Result<(), Box<dyn Error>> {
758     ///     // Connect to a peer
759     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
760     ///
761     ///     loop {
762     ///         // Wait for the socket to be writable
763     ///         stream.writable().await?;
764     ///
765     ///         // Try to write data, this may still fail with `WouldBlock`
766     ///         // if the readiness event is a false positive.
767     ///         match stream.try_write(b"hello world") {
768     ///             Ok(n) => {
769     ///                 break;
770     ///             }
771     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
772     ///                 continue;
773     ///             }
774     ///             Err(e) => {
775     ///                 return Err(e.into());
776     ///             }
777     ///         }
778     ///     }
779     ///
780     ///     Ok(())
781     /// }
782     /// ```
writable(&self) -> io::Result<()>783     pub async fn writable(&self) -> io::Result<()> {
784         self.ready(Interest::WRITABLE).await?;
785         Ok(())
786     }
787 
788     /// Polls for write readiness.
789     ///
790     /// If the tcp stream is not currently ready for writing, this method will
791     /// store a clone of the `Waker` from the provided `Context`. When the tcp
792     /// stream becomes ready for writing, `Waker::wake` will be called on the
793     /// waker.
794     ///
795     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
796     /// the `Waker` from the `Context` passed to the most recent call is
797     /// scheduled to receive a wakeup. (However, `poll_read_ready` retains a
798     /// second, independent waker.)
799     ///
800     /// This function is intended for cases where creating and pinning a future
801     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
802     /// preferred, as this supports polling from multiple tasks at once.
803     ///
804     /// # Return value
805     ///
806     /// The function returns:
807     ///
808     /// * `Poll::Pending` if the tcp stream is not ready for writing.
809     /// * `Poll::Ready(Ok(()))` if the tcp stream is ready for writing.
810     /// * `Poll::Ready(Err(e))` if an error is encountered.
811     ///
812     /// # Errors
813     ///
814     /// This function may encounter any standard I/O error except `WouldBlock`.
815     ///
816     /// [`writable`]: method@Self::writable
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>817     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
818         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
819     }
820 
821     /// Try to write a buffer to the stream, returning how many bytes were
822     /// written.
823     ///
824     /// The function will attempt to write the entire contents of `buf`, but
825     /// only part of the buffer may be written.
826     ///
827     /// This function is usually paired with `writable()`.
828     ///
829     /// # Return
830     ///
831     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
832     /// number of bytes written. If the stream is not ready to write data,
833     /// `Err(io::ErrorKind::WouldBlock)` is returned.
834     ///
835     /// # Examples
836     ///
837     /// ```no_run
838     /// use tokio::net::TcpStream;
839     /// use std::error::Error;
840     /// use std::io;
841     ///
842     /// #[tokio::main]
843     /// async fn main() -> Result<(), Box<dyn Error>> {
844     ///     // Connect to a peer
845     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
846     ///
847     ///     loop {
848     ///         // Wait for the socket to be writable
849     ///         stream.writable().await?;
850     ///
851     ///         // Try to write data, this may still fail with `WouldBlock`
852     ///         // if the readiness event is a false positive.
853     ///         match stream.try_write(b"hello world") {
854     ///             Ok(n) => {
855     ///                 break;
856     ///             }
857     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
858     ///                 continue;
859     ///             }
860     ///             Err(e) => {
861     ///                 return Err(e.into());
862     ///             }
863     ///         }
864     ///     }
865     ///
866     ///     Ok(())
867     /// }
868     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>869     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
870         use std::io::Write;
871 
872         self.io
873             .registration()
874             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
875     }
876 
877     /// Tries to write several buffers to the stream, returning how many bytes
878     /// were written.
879     ///
880     /// Data is written from each buffer in order, with the final buffer read
881     /// from possible being only partially consumed. This method behaves
882     /// equivalently to a single call to [`try_write()`] with concatenated
883     /// buffers.
884     ///
885     /// This function is usually paired with `writable()`.
886     ///
887     /// [`try_write()`]: TcpStream::try_write()
888     ///
889     /// # Return
890     ///
891     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
892     /// number of bytes written. If the stream is not ready to write data,
893     /// `Err(io::ErrorKind::WouldBlock)` is returned.
894     ///
895     /// # Examples
896     ///
897     /// ```no_run
898     /// use tokio::net::TcpStream;
899     /// use std::error::Error;
900     /// use std::io;
901     ///
902     /// #[tokio::main]
903     /// async fn main() -> Result<(), Box<dyn Error>> {
904     ///     // Connect to a peer
905     ///     let stream = TcpStream::connect("127.0.0.1:8080").await?;
906     ///
907     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
908     ///
909     ///     loop {
910     ///         // Wait for the socket to be writable
911     ///         stream.writable().await?;
912     ///
913     ///         // Try to write data, this may still fail with `WouldBlock`
914     ///         // if the readiness event is a false positive.
915     ///         match stream.try_write_vectored(&bufs) {
916     ///             Ok(n) => {
917     ///                 break;
918     ///             }
919     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
920     ///                 continue;
921     ///             }
922     ///             Err(e) => {
923     ///                 return Err(e.into());
924     ///             }
925     ///         }
926     ///     }
927     ///
928     ///     Ok(())
929     /// }
930     /// ```
try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize>931     pub fn try_write_vectored(&self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
932         use std::io::Write;
933 
934         self.io
935             .registration()
936             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(bufs))
937     }
938 
939     /// Tries to read or write from the socket using a user-provided IO operation.
940     ///
941     /// If the socket is ready, the provided closure is called. The closure
942     /// should attempt to perform IO operation from the socket by manually
943     /// calling the appropriate syscall. If the operation fails because the
944     /// socket is not actually ready, then the closure should return a
945     /// `WouldBlock` error and the readiness flag is cleared. The return value
946     /// of the closure is then returned by `try_io`.
947     ///
948     /// If the socket is not ready, then the closure is not called
949     /// and a `WouldBlock` error is returned.
950     ///
951     /// The closure should only return a `WouldBlock` error if it has performed
952     /// an IO operation on the socket that failed due to the socket not being
953     /// ready. Returning a `WouldBlock` error in any other situation will
954     /// incorrectly clear the readiness flag, which can cause the socket to
955     /// behave incorrectly.
956     ///
957     /// The closure should not perform the IO operation using any of the methods
958     /// defined on the Tokio `TcpStream` type, as this will mess with the
959     /// readiness flag and can cause the socket to behave incorrectly.
960     ///
961     /// Usually, [`readable()`], [`writable()`] or [`ready()`] is used with this function.
962     ///
963     /// [`readable()`]: TcpStream::readable()
964     /// [`writable()`]: TcpStream::writable()
965     /// [`ready()`]: TcpStream::ready()
try_io<R>( &self, interest: Interest, f: impl FnOnce() -> io::Result<R>, ) -> io::Result<R>966     pub fn try_io<R>(
967         &self,
968         interest: Interest,
969         f: impl FnOnce() -> io::Result<R>,
970     ) -> io::Result<R> {
971         self.io.registration().try_io(interest, f)
972     }
973 
974     /// Receives data on the socket from the remote address to which it is
975     /// connected, without removing that data from the queue. On success,
976     /// returns the number of bytes peeked.
977     ///
978     /// Successive calls return the same data. This is accomplished by passing
979     /// `MSG_PEEK` as a flag to the underlying recv system call.
980     ///
981     /// # Examples
982     ///
983     /// ```no_run
984     /// use tokio::net::TcpStream;
985     /// use tokio::io::AsyncReadExt;
986     /// use std::error::Error;
987     ///
988     /// #[tokio::main]
989     /// async fn main() -> Result<(), Box<dyn Error>> {
990     ///     // Connect to a peer
991     ///     let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
992     ///
993     ///     let mut b1 = [0; 10];
994     ///     let mut b2 = [0; 10];
995     ///
996     ///     // Peek at the data
997     ///     let n = stream.peek(&mut b1).await?;
998     ///
999     ///     // Read the data
1000     ///     assert_eq!(n, stream.read(&mut b2[..n]).await?);
1001     ///     assert_eq!(&b1[..n], &b2[..n]);
1002     ///
1003     ///     Ok(())
1004     /// }
1005     /// ```
1006     ///
1007     /// The [`read`] method is defined on the [`AsyncReadExt`] trait.
1008     ///
1009     /// [`read`]: fn@crate::io::AsyncReadExt::read
1010     /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
peek(&self, buf: &mut [u8]) -> io::Result<usize>1011     pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
1012         self.io
1013             .registration()
1014             .async_io(Interest::READABLE, || self.io.peek(buf))
1015             .await
1016     }
1017 
1018     /// Shuts down the read, write, or both halves of this connection.
1019     ///
1020     /// This function will cause all pending and future I/O on the specified
1021     /// portions to return immediately with an appropriate value (see the
1022     /// documentation of `Shutdown`).
shutdown_std(&self, how: Shutdown) -> io::Result<()>1023     pub(super) fn shutdown_std(&self, how: Shutdown) -> io::Result<()> {
1024         self.io.shutdown(how)
1025     }
1026 
1027     /// Gets the value of the `TCP_NODELAY` option on this socket.
1028     ///
1029     /// For more information about this option, see [`set_nodelay`].
1030     ///
1031     /// [`set_nodelay`]: TcpStream::set_nodelay
1032     ///
1033     /// # Examples
1034     ///
1035     /// ```no_run
1036     /// use tokio::net::TcpStream;
1037     ///
1038     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1039     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1040     ///
1041     /// println!("{:?}", stream.nodelay()?);
1042     /// # Ok(())
1043     /// # }
1044     /// ```
nodelay(&self) -> io::Result<bool>1045     pub fn nodelay(&self) -> io::Result<bool> {
1046         self.io.nodelay()
1047     }
1048 
1049     /// Sets the value of the `TCP_NODELAY` option on this socket.
1050     ///
1051     /// If set, this option disables the Nagle algorithm. This means that
1052     /// segments are always sent as soon as possible, even if there is only a
1053     /// small amount of data. When not set, data is buffered until there is a
1054     /// sufficient amount to send out, thereby avoiding the frequent sending of
1055     /// small packets.
1056     ///
1057     /// # Examples
1058     ///
1059     /// ```no_run
1060     /// use tokio::net::TcpStream;
1061     ///
1062     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1063     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1064     ///
1065     /// stream.set_nodelay(true)?;
1066     /// # Ok(())
1067     /// # }
1068     /// ```
set_nodelay(&self, nodelay: bool) -> io::Result<()>1069     pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
1070         self.io.set_nodelay(nodelay)
1071     }
1072 
1073     /// Reads the linger duration for this socket by getting the `SO_LINGER`
1074     /// option.
1075     ///
1076     /// For more information about this option, see [`set_linger`].
1077     ///
1078     /// [`set_linger`]: TcpStream::set_linger
1079     ///
1080     /// # Examples
1081     ///
1082     /// ```no_run
1083     /// use tokio::net::TcpStream;
1084     ///
1085     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1086     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1087     ///
1088     /// println!("{:?}", stream.linger()?);
1089     /// # Ok(())
1090     /// # }
1091     /// ```
linger(&self) -> io::Result<Option<Duration>>1092     pub fn linger(&self) -> io::Result<Option<Duration>> {
1093         let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());
1094 
1095         mio_socket.get_linger()
1096     }
1097 
1098     /// Sets the linger duration of this socket by setting the SO_LINGER option.
1099     ///
1100     /// This option controls the action taken when a stream has unsent messages and the stream is
1101     /// closed. If SO_LINGER is set, the system shall block the process until it can transmit the
1102     /// data or until the time expires.
1103     ///
1104     /// If SO_LINGER is not specified, and the stream is closed, the system handles the call in a
1105     /// way that allows the process to continue as quickly as possible.
1106     ///
1107     /// # Examples
1108     ///
1109     /// ```no_run
1110     /// use tokio::net::TcpStream;
1111     ///
1112     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1113     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1114     ///
1115     /// stream.set_linger(None)?;
1116     /// # Ok(())
1117     /// # }
1118     /// ```
set_linger(&self, dur: Option<Duration>) -> io::Result<()>1119     pub fn set_linger(&self, dur: Option<Duration>) -> io::Result<()> {
1120         let mio_socket = std::mem::ManuallyDrop::new(self.to_mio());
1121 
1122         mio_socket.set_linger(dur)
1123     }
1124 
to_mio(&self) -> mio::net::TcpSocket1125     fn to_mio(&self) -> mio::net::TcpSocket {
1126         #[cfg(windows)]
1127         {
1128             use std::os::windows::io::{AsRawSocket, FromRawSocket};
1129             unsafe { mio::net::TcpSocket::from_raw_socket(self.as_raw_socket()) }
1130         }
1131 
1132         #[cfg(unix)]
1133         {
1134             use std::os::unix::io::{AsRawFd, FromRawFd};
1135             unsafe { mio::net::TcpSocket::from_raw_fd(self.as_raw_fd()) }
1136         }
1137     }
1138 
1139     /// Gets the value of the `IP_TTL` option for this socket.
1140     ///
1141     /// For more information about this option, see [`set_ttl`].
1142     ///
1143     /// [`set_ttl`]: TcpStream::set_ttl
1144     ///
1145     /// # Examples
1146     ///
1147     /// ```no_run
1148     /// use tokio::net::TcpStream;
1149     ///
1150     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1151     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1152     ///
1153     /// println!("{:?}", stream.ttl()?);
1154     /// # Ok(())
1155     /// # }
1156     /// ```
ttl(&self) -> io::Result<u32>1157     pub fn ttl(&self) -> io::Result<u32> {
1158         self.io.ttl()
1159     }
1160 
1161     /// Sets the value for the `IP_TTL` option on this socket.
1162     ///
1163     /// This value sets the time-to-live field that is used in every packet sent
1164     /// from this socket.
1165     ///
1166     /// # Examples
1167     ///
1168     /// ```no_run
1169     /// use tokio::net::TcpStream;
1170     ///
1171     /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
1172     /// let stream = TcpStream::connect("127.0.0.1:8080").await?;
1173     ///
1174     /// stream.set_ttl(123)?;
1175     /// # Ok(())
1176     /// # }
1177     /// ```
set_ttl(&self, ttl: u32) -> io::Result<()>1178     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
1179         self.io.set_ttl(ttl)
1180     }
1181 
1182     // These lifetime markers also appear in the generated documentation, and make
1183     // it more clear that this is a *borrowed* split.
1184     #[allow(clippy::needless_lifetimes)]
1185     /// Splits a `TcpStream` into a read half and a write half, which can be used
1186     /// to read and write the stream concurrently.
1187     ///
1188     /// This method is more efficient than [`into_split`], but the halves cannot be
1189     /// moved into independently spawned tasks.
1190     ///
1191     /// [`into_split`]: TcpStream::into_split()
split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>)1192     pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
1193         split(self)
1194     }
1195 
1196     /// Splits a `TcpStream` into a read half and a write half, which can be used
1197     /// to read and write the stream concurrently.
1198     ///
1199     /// Unlike [`split`], the owned halves can be moved to separate tasks, however
1200     /// this comes at the cost of a heap allocation.
1201     ///
1202     /// **Note:** Dropping the write half will shut down the write half of the TCP
1203     /// stream. This is equivalent to calling [`shutdown()`] on the `TcpStream`.
1204     ///
1205     /// [`split`]: TcpStream::split()
1206     /// [`shutdown()`]: fn@crate::io::AsyncWriteExt::shutdown
into_split(self) -> (OwnedReadHalf, OwnedWriteHalf)1207     pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
1208         split_owned(self)
1209     }
1210 
1211     // == Poll IO functions that takes `&self` ==
1212     //
1213     // To read or write without mutable access to the `UnixStream`, combine the
1214     // `poll_read_ready` or `poll_write_ready` methods with the `try_read` or
1215     // `try_write` methods.
1216 
poll_read_priv( &self, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1217     pub(crate) fn poll_read_priv(
1218         &self,
1219         cx: &mut Context<'_>,
1220         buf: &mut ReadBuf<'_>,
1221     ) -> Poll<io::Result<()>> {
1222         // Safety: `TcpStream::read` correctly handles reads into uninitialized memory
1223         unsafe { self.io.poll_read(cx, buf) }
1224     }
1225 
poll_write_priv( &self, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1226     pub(super) fn poll_write_priv(
1227         &self,
1228         cx: &mut Context<'_>,
1229         buf: &[u8],
1230     ) -> Poll<io::Result<usize>> {
1231         self.io.poll_write(cx, buf)
1232     }
1233 
poll_write_vectored_priv( &self, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1234     pub(super) fn poll_write_vectored_priv(
1235         &self,
1236         cx: &mut Context<'_>,
1237         bufs: &[io::IoSlice<'_>],
1238     ) -> Poll<io::Result<usize>> {
1239         self.io.poll_write_vectored(cx, bufs)
1240     }
1241 }
1242 
1243 impl TryFrom<std::net::TcpStream> for TcpStream {
1244     type Error = io::Error;
1245 
1246     /// Consumes stream, returning the tokio I/O object.
1247     ///
1248     /// This is equivalent to
1249     /// [`TcpStream::from_std(stream)`](TcpStream::from_std).
try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error>1250     fn try_from(stream: std::net::TcpStream) -> Result<Self, Self::Error> {
1251         Self::from_std(stream)
1252     }
1253 }
1254 
1255 // ===== impl Read / Write =====
1256 
1257 impl AsyncRead for TcpStream {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1258     fn poll_read(
1259         self: Pin<&mut Self>,
1260         cx: &mut Context<'_>,
1261         buf: &mut ReadBuf<'_>,
1262     ) -> Poll<io::Result<()>> {
1263         self.poll_read_priv(cx, buf)
1264     }
1265 }
1266 
1267 impl AsyncWrite for TcpStream {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>1268     fn poll_write(
1269         self: Pin<&mut Self>,
1270         cx: &mut Context<'_>,
1271         buf: &[u8],
1272     ) -> Poll<io::Result<usize>> {
1273         self.poll_write_priv(cx, buf)
1274     }
1275 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>1276     fn poll_write_vectored(
1277         self: Pin<&mut Self>,
1278         cx: &mut Context<'_>,
1279         bufs: &[io::IoSlice<'_>],
1280     ) -> Poll<io::Result<usize>> {
1281         self.poll_write_vectored_priv(cx, bufs)
1282     }
1283 
is_write_vectored(&self) -> bool1284     fn is_write_vectored(&self) -> bool {
1285         true
1286     }
1287 
1288     #[inline]
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1289     fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1290         // tcp flush is a no-op
1291         Poll::Ready(Ok(()))
1292     }
1293 
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>1294     fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
1295         self.shutdown_std(std::net::Shutdown::Write)?;
1296         Poll::Ready(Ok(()))
1297     }
1298 }
1299 
1300 impl fmt::Debug for TcpStream {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result1301     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1302         self.io.fmt(f)
1303     }
1304 }
1305 
1306 #[cfg(unix)]
1307 mod sys {
1308     use super::TcpStream;
1309     use std::os::unix::prelude::*;
1310 
1311     impl AsRawFd for TcpStream {
as_raw_fd(&self) -> RawFd1312         fn as_raw_fd(&self) -> RawFd {
1313             self.io.as_raw_fd()
1314         }
1315     }
1316 }
1317 
1318 #[cfg(windows)]
1319 mod sys {
1320     use super::TcpStream;
1321     use std::os::windows::prelude::*;
1322 
1323     impl AsRawSocket for TcpStream {
as_raw_socket(&self) -> RawSocket1324         fn as_raw_socket(&self) -> RawSocket {
1325             self.io.as_raw_socket()
1326         }
1327     }
1328 }
1329