1 //! Asynchronous streams. 2 3 use core::ops::DerefMut; 4 use core::pin::Pin; 5 use core::task::{Context, Poll}; 6 7 /// An owned dynamically typed [`Stream`] for use in cases where you can't 8 /// statically type your result or need to add some indirection. 9 #[cfg(feature = "alloc")] 10 pub type BoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + Send + 'a>>; 11 12 /// `BoxStream`, but without the `Send` requirement. 13 #[cfg(feature = "alloc")] 14 pub type LocalBoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + 'a>>; 15 16 /// A stream of values produced asynchronously. 17 /// 18 /// If `Future<Output = T>` is an asynchronous version of `T`, then `Stream<Item 19 /// = T>` is an asynchronous version of `Iterator<Item = T>`. A stream 20 /// represents a sequence of value-producing events that occur asynchronously to 21 /// the caller. 22 /// 23 /// The trait is modeled after `Future`, but allows `poll_next` to be called 24 /// even after a value has been produced, yielding `None` once the stream has 25 /// been fully exhausted. 26 #[must_use = "streams do nothing unless polled"] 27 pub trait Stream { 28 /// Values yielded by the stream. 29 type Item; 30 31 /// Attempt to pull out the next value of this stream, registering the 32 /// current task for wakeup if the value is not yet available, and returning 33 /// `None` if the stream is exhausted. 34 /// 35 /// # Return value 36 /// 37 /// There are several possible return values, each indicating a distinct 38 /// stream state: 39 /// 40 /// - `Poll::Pending` means that this stream's next value is not ready 41 /// yet. Implementations will ensure that the current task will be notified 42 /// when the next value may be ready. 43 /// 44 /// - `Poll::Ready(Some(val))` means that the stream has successfully 45 /// produced a value, `val`, and may produce further values on subsequent 46 /// `poll_next` calls. 47 /// 48 /// - `Poll::Ready(None)` means that the stream has terminated, and 49 /// `poll_next` should not be invoked again. 50 /// 51 /// # Panics 52 /// 53 /// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its 54 /// `poll_next` method again may panic, block forever, or cause other kinds of 55 /// problems; the `Stream` trait places no requirements on the effects of 56 /// such a call. However, as the `poll_next` method is not marked `unsafe`, 57 /// Rust's usual rules apply: calls must never cause undefined behavior 58 /// (memory corruption, incorrect use of `unsafe` functions, or the like), 59 /// regardless of the stream's state. 60 /// 61 /// If this is difficult to guard against then the [`fuse`] adapter can be used 62 /// to ensure that `poll_next` always returns `Ready(None)` in subsequent 63 /// calls. 64 /// 65 /// [`fuse`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.fuse poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>66 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; 67 68 /// Returns the bounds on the remaining length of the stream. 69 /// 70 /// Specifically, `size_hint()` returns a tuple where the first element 71 /// is the lower bound, and the second element is the upper bound. 72 /// 73 /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`. 74 /// A [`None`] here means that either there is no known upper bound, or the 75 /// upper bound is larger than [`usize`]. 76 /// 77 /// # Implementation notes 78 /// 79 /// It is not enforced that a stream implementation yields the declared 80 /// number of elements. A buggy stream may yield less than the lower bound 81 /// or more than the upper bound of elements. 82 /// 83 /// `size_hint()` is primarily intended to be used for optimizations such as 84 /// reserving space for the elements of the stream, but must not be 85 /// trusted to e.g., omit bounds checks in unsafe code. An incorrect 86 /// implementation of `size_hint()` should not lead to memory safety 87 /// violations. 88 /// 89 /// That said, the implementation should provide a correct estimation, 90 /// because otherwise it would be a violation of the trait's protocol. 91 /// 92 /// The default implementation returns `(0, `[`None`]`)` which is correct for any 93 /// stream. 94 #[inline] size_hint(&self) -> (usize, Option<usize>)95 fn size_hint(&self) -> (usize, Option<usize>) { 96 (0, None) 97 } 98 } 99 100 impl<S: ?Sized + Stream + Unpin> Stream for &mut S { 101 type Item = S::Item; 102 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>103 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 104 S::poll_next(Pin::new(&mut **self), cx) 105 } 106 size_hint(&self) -> (usize, Option<usize>)107 fn size_hint(&self) -> (usize, Option<usize>) { 108 (**self).size_hint() 109 } 110 } 111 112 impl<P> Stream for Pin<P> 113 where 114 P: DerefMut + Unpin, 115 P::Target: Stream, 116 { 117 type Item = <P::Target as Stream>::Item; 118 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>119 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 120 self.get_mut().as_mut().poll_next(cx) 121 } 122 size_hint(&self) -> (usize, Option<usize>)123 fn size_hint(&self) -> (usize, Option<usize>) { 124 (**self).size_hint() 125 } 126 } 127 128 /// A stream which tracks whether or not the underlying stream 129 /// should no longer be polled. 130 /// 131 /// `is_terminated` will return `true` if a future should no longer be polled. 132 /// Usually, this state occurs after `poll_next` (or `try_poll_next`) returned 133 /// `Poll::Ready(None)`. However, `is_terminated` may also return `true` if a 134 /// stream has become inactive and can no longer make progress and should be 135 /// ignored or dropped rather than being polled again. 136 pub trait FusedStream: Stream { 137 /// Returns `true` if the stream should no longer be polled. is_terminated(&self) -> bool138 fn is_terminated(&self) -> bool; 139 } 140 141 impl<F: ?Sized + FusedStream + Unpin> FusedStream for &mut F { is_terminated(&self) -> bool142 fn is_terminated(&self) -> bool { 143 <F as FusedStream>::is_terminated(&**self) 144 } 145 } 146 147 impl<P> FusedStream for Pin<P> 148 where 149 P: DerefMut + Unpin, 150 P::Target: FusedStream, 151 { is_terminated(&self) -> bool152 fn is_terminated(&self) -> bool { 153 <P::Target as FusedStream>::is_terminated(&**self) 154 } 155 } 156 157 mod private_try_stream { 158 use super::Stream; 159 160 pub trait Sealed {} 161 162 impl<S, T, E> Sealed for S where S: ?Sized + Stream<Item = Result<T, E>> {} 163 } 164 165 /// A convenience for streams that return `Result` values that includes 166 /// a variety of adapters tailored to such futures. 167 pub trait TryStream: Stream + private_try_stream::Sealed { 168 /// The type of successful values yielded by this future 169 type Ok; 170 171 /// The type of failures yielded by this future 172 type Error; 173 174 /// Poll this `TryStream` as if it were a `Stream`. 175 /// 176 /// This method is a stopgap for a compiler limitation that prevents us from 177 /// directly inheriting from the `Stream` trait; in the future it won't be 178 /// needed. try_poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Ok, Self::Error>>>179 fn try_poll_next( 180 self: Pin<&mut Self>, 181 cx: &mut Context<'_>, 182 ) -> Poll<Option<Result<Self::Ok, Self::Error>>>; 183 } 184 185 impl<S, T, E> TryStream for S 186 where 187 S: ?Sized + Stream<Item = Result<T, E>>, 188 { 189 type Ok = T; 190 type Error = E; 191 try_poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Ok, Self::Error>>>192 fn try_poll_next( 193 self: Pin<&mut Self>, 194 cx: &mut Context<'_>, 195 ) -> Poll<Option<Result<Self::Ok, Self::Error>>> { 196 self.poll_next(cx) 197 } 198 } 199 200 #[cfg(feature = "alloc")] 201 mod if_alloc { 202 use super::*; 203 use alloc::boxed::Box; 204 205 impl<S: ?Sized + Stream + Unpin> Stream for Box<S> { 206 type Item = S::Item; 207 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>208 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 209 Pin::new(&mut **self).poll_next(cx) 210 } 211 size_hint(&self) -> (usize, Option<usize>)212 fn size_hint(&self) -> (usize, Option<usize>) { 213 (**self).size_hint() 214 } 215 } 216 217 #[cfg(feature = "std")] 218 impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> { 219 type Item = S::Item; 220 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>>221 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { 222 unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx) 223 } 224 size_hint(&self) -> (usize, Option<usize>)225 fn size_hint(&self) -> (usize, Option<usize>) { 226 self.0.size_hint() 227 } 228 } 229 230 impl<S: ?Sized + FusedStream + Unpin> FusedStream for Box<S> { is_terminated(&self) -> bool231 fn is_terminated(&self) -> bool { 232 <S as FusedStream>::is_terminated(&**self) 233 } 234 } 235 } 236