• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Asynchronous streams.
2 
3 use core::ops::DerefMut;
4 use core::pin::Pin;
5 use core::task::{Context, Poll};
6 
7 /// An owned dynamically typed [`Stream`] for use in cases where you can't
8 /// statically type your result or need to add some indirection.
9 #[cfg(feature = "alloc")]
10 pub type BoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + Send + 'a>>;
11 
12 /// `BoxStream`, but without the `Send` requirement.
13 #[cfg(feature = "alloc")]
14 pub type LocalBoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + 'a>>;
15 
16 /// A stream of values produced asynchronously.
17 ///
18 /// If `Future<Output = T>` is an asynchronous version of `T`, then `Stream<Item
19 /// = T>` is an asynchronous version of `Iterator<Item = T>`. A stream
20 /// represents a sequence of value-producing events that occur asynchronously to
21 /// the caller.
22 ///
23 /// The trait is modeled after `Future`, but allows `poll_next` to be called
24 /// even after a value has been produced, yielding `None` once the stream has
25 /// been fully exhausted.
26 #[must_use = "streams do nothing unless polled"]
27 pub trait Stream {
28     /// Values yielded by the stream.
29     type Item;
30 
31     /// Attempt to pull out the next value of this stream, registering the
32     /// current task for wakeup if the value is not yet available, and returning
33     /// `None` if the stream is exhausted.
34     ///
35     /// # Return value
36     ///
37     /// There are several possible return values, each indicating a distinct
38     /// stream state:
39     ///
40     /// - `Poll::Pending` means that this stream's next value is not ready
41     /// yet. Implementations will ensure that the current task will be notified
42     /// when the next value may be ready.
43     ///
44     /// - `Poll::Ready(Some(val))` means that the stream has successfully
45     /// produced a value, `val`, and may produce further values on subsequent
46     /// `poll_next` calls.
47     ///
48     /// - `Poll::Ready(None)` means that the stream has terminated, and
49     /// `poll_next` should not be invoked again.
50     ///
51     /// # Panics
52     ///
53     /// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its
54     /// `poll_next` method again may panic, block forever, or cause other kinds of
55     /// problems; the `Stream` trait places no requirements on the effects of
56     /// such a call. However, as the `poll_next` method is not marked `unsafe`,
57     /// Rust's usual rules apply: calls must never cause undefined behavior
58     /// (memory corruption, incorrect use of `unsafe` functions, or the like),
59     /// regardless of the stream's state.
60     ///
61     /// If this is difficult to guard against then the [`fuse`] adapter can be used
62     /// to ensure that `poll_next` always returns `Ready(None)` in subsequent
63     /// calls.
64     ///
65     /// [`fuse`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.fuse
poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>66     fn poll_next(
67         self: Pin<&mut Self>,
68         cx: &mut Context<'_>,
69     ) -> Poll<Option<Self::Item>>;
70 
71     /// Returns the bounds on the remaining length of the stream.
72     ///
73     /// Specifically, `size_hint()` returns a tuple where the first element
74     /// is the lower bound, and the second element is the upper bound.
75     ///
76     /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`.
77     /// A [`None`] here means that either there is no known upper bound, or the
78     /// upper bound is larger than [`usize`].
79     ///
80     /// # Implementation notes
81     ///
82     /// It is not enforced that a stream implementation yields the declared
83     /// number of elements. A buggy stream may yield less than the lower bound
84     /// or more than the upper bound of elements.
85     ///
86     /// `size_hint()` is primarily intended to be used for optimizations such as
87     /// reserving space for the elements of the stream, but must not be
88     /// trusted to e.g., omit bounds checks in unsafe code. An incorrect
89     /// implementation of `size_hint()` should not lead to memory safety
90     /// violations.
91     ///
92     /// That said, the implementation should provide a correct estimation,
93     /// because otherwise it would be a violation of the trait's protocol.
94     ///
95     /// The default implementation returns `(0, `[`None`]`)` which is correct for any
96     /// stream.
97     #[inline]
size_hint(&self) -> (usize, Option<usize>)98     fn size_hint(&self) -> (usize, Option<usize>) {
99         (0, None)
100     }
101 }
102 
103 impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
104     type Item = S::Item;
105 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>106     fn poll_next(
107         mut self: Pin<&mut Self>,
108         cx: &mut Context<'_>,
109     ) -> Poll<Option<Self::Item>> {
110         S::poll_next(Pin::new(&mut **self), cx)
111     }
112 
size_hint(&self) -> (usize, Option<usize>)113     fn size_hint(&self) -> (usize, Option<usize>) {
114         (**self).size_hint()
115     }
116 }
117 
118 impl<P> Stream for Pin<P>
119 where
120     P: DerefMut + Unpin,
121     P::Target: Stream,
122 {
123     type Item = <P::Target as Stream>::Item;
124 
poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>125     fn poll_next(
126         self: Pin<&mut Self>,
127         cx: &mut Context<'_>,
128     ) -> Poll<Option<Self::Item>> {
129         self.get_mut().as_mut().poll_next(cx)
130     }
131 
size_hint(&self) -> (usize, Option<usize>)132     fn size_hint(&self) -> (usize, Option<usize>) {
133         (**self).size_hint()
134     }
135 }
136 
137 /// A stream which tracks whether or not the underlying stream
138 /// should no longer be polled.
139 ///
140 /// `is_terminated` will return `true` if a future should no longer be polled.
141 /// Usually, this state occurs after `poll_next` (or `try_poll_next`) returned
142 /// `Poll::Ready(None)`. However, `is_terminated` may also return `true` if a
143 /// stream has become inactive and can no longer make progress and should be
144 /// ignored or dropped rather than being polled again.
145 pub trait FusedStream: Stream {
146     /// Returns `true` if the stream should no longer be polled.
is_terminated(&self) -> bool147     fn is_terminated(&self) -> bool;
148 }
149 
150 impl<F: ?Sized + FusedStream + Unpin> FusedStream for &mut F {
is_terminated(&self) -> bool151     fn is_terminated(&self) -> bool {
152         <F as FusedStream>::is_terminated(&**self)
153     }
154 }
155 
156 impl<P> FusedStream for Pin<P>
157 where
158     P: DerefMut + Unpin,
159     P::Target: FusedStream,
160 {
is_terminated(&self) -> bool161     fn is_terminated(&self) -> bool {
162         <P::Target as FusedStream>::is_terminated(&**self)
163     }
164 }
165 
166 mod private_try_stream {
167     use super::Stream;
168 
169     pub trait Sealed {}
170 
171     impl<S, T, E> Sealed for S where S: ?Sized + Stream<Item = Result<T, E>> {}
172 }
173 
174 /// A convenience for streams that return `Result` values that includes
175 /// a variety of adapters tailored to such futures.
176 pub trait TryStream: Stream + private_try_stream::Sealed {
177     /// The type of successful values yielded by this future
178     type Ok;
179 
180     /// The type of failures yielded by this future
181     type Error;
182 
183     /// Poll this `TryStream` as if it were a `Stream`.
184     ///
185     /// This method is a stopgap for a compiler limitation that prevents us from
186     /// directly inheriting from the `Stream` trait; in the future it won't be
187     /// needed.
try_poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Self::Ok, Self::Error>>>188     fn try_poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
189         -> Poll<Option<Result<Self::Ok, Self::Error>>>;
190 }
191 
192 impl<S, T, E> TryStream for S
193     where S: ?Sized + Stream<Item = Result<T, E>>
194 {
195     type Ok = T;
196     type Error = E;
197 
try_poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Self::Ok, Self::Error>>>198     fn try_poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
199         -> Poll<Option<Result<Self::Ok, Self::Error>>>
200     {
201         self.poll_next(cx)
202     }
203 }
204 
205 #[cfg(feature = "alloc")]
206 mod if_alloc {
207     use alloc::boxed::Box;
208     use super::*;
209 
210     impl<S: ?Sized + Stream + Unpin> Stream for Box<S> {
211         type Item = S::Item;
212 
poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>213         fn poll_next(
214             mut self: Pin<&mut Self>,
215             cx: &mut Context<'_>,
216         ) -> Poll<Option<Self::Item>> {
217             Pin::new(&mut **self).poll_next(cx)
218         }
219 
size_hint(&self) -> (usize, Option<usize>)220         fn size_hint(&self) -> (usize, Option<usize>) {
221             (**self).size_hint()
222         }
223     }
224 
225     #[cfg(feature = "std")]
226     impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> {
227         type Item = S::Item;
228 
poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<S::Item>>229         fn poll_next(
230             self: Pin<&mut Self>,
231             cx: &mut Context<'_>,
232         ) -> Poll<Option<S::Item>> {
233             unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx)
234         }
235 
size_hint(&self) -> (usize, Option<usize>)236         fn size_hint(&self) -> (usize, Option<usize>) {
237             self.0.size_hint()
238         }
239     }
240 
241     impl<S: ?Sized + FusedStream + Unpin> FusedStream for Box<S> {
is_terminated(&self) -> bool242         fn is_terminated(&self) -> bool {
243             <S as FusedStream>::is_terminated(&**self)
244         }
245     }
246 }
247