1 //! Asynchronous streams. 2 3 use core::ops::DerefMut; 4 use core::pin::Pin; 5 use core::task::{Context, Poll}; 6 7 /// An owned dynamically typed [`Stream`] for use in cases where you can't 8 /// statically type your result or need to add some indirection. 9 #[cfg(feature = "alloc")] 10 pub type BoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + Send + 'a>>; 11 12 /// `BoxStream`, but without the `Send` requirement. 13 #[cfg(feature = "alloc")] 14 pub type LocalBoxStream<'a, T> = Pin<alloc::boxed::Box<dyn Stream<Item = T> + 'a>>; 15 16 /// A stream of values produced asynchronously. 17 /// 18 /// If `Future<Output = T>` is an asynchronous version of `T`, then `Stream<Item 19 /// = T>` is an asynchronous version of `Iterator<Item = T>`. A stream 20 /// represents a sequence of value-producing events that occur asynchronously to 21 /// the caller. 22 /// 23 /// The trait is modeled after `Future`, but allows `poll_next` to be called 24 /// even after a value has been produced, yielding `None` once the stream has 25 /// been fully exhausted. 26 #[must_use = "streams do nothing unless polled"] 27 pub trait Stream { 28 /// Values yielded by the stream. 29 type Item; 30 31 /// Attempt to pull out the next value of this stream, registering the 32 /// current task for wakeup if the value is not yet available, and returning 33 /// `None` if the stream is exhausted. 34 /// 35 /// # Return value 36 /// 37 /// There are several possible return values, each indicating a distinct 38 /// stream state: 39 /// 40 /// - `Poll::Pending` means that this stream's next value is not ready 41 /// yet. Implementations will ensure that the current task will be notified 42 /// when the next value may be ready. 43 /// 44 /// - `Poll::Ready(Some(val))` means that the stream has successfully 45 /// produced a value, `val`, and may produce further values on subsequent 46 /// `poll_next` calls. 47 /// 48 /// - `Poll::Ready(None)` means that the stream has terminated, and 49 /// `poll_next` should not be invoked again. 50 /// 51 /// # Panics 52 /// 53 /// Once a stream has finished (returned `Ready(None)` from `poll_next`), calling its 54 /// `poll_next` method again may panic, block forever, or cause other kinds of 55 /// problems; the `Stream` trait places no requirements on the effects of 56 /// such a call. However, as the `poll_next` method is not marked `unsafe`, 57 /// Rust's usual rules apply: calls must never cause undefined behavior 58 /// (memory corruption, incorrect use of `unsafe` functions, or the like), 59 /// regardless of the stream's state. 60 /// 61 /// If this is difficult to guard against then the [`fuse`] adapter can be used 62 /// to ensure that `poll_next` always returns `Ready(None)` in subsequent 63 /// calls. 64 /// 65 /// [`fuse`]: https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html#method.fuse poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>66 fn poll_next( 67 self: Pin<&mut Self>, 68 cx: &mut Context<'_>, 69 ) -> Poll<Option<Self::Item>>; 70 71 /// Returns the bounds on the remaining length of the stream. 72 /// 73 /// Specifically, `size_hint()` returns a tuple where the first element 74 /// is the lower bound, and the second element is the upper bound. 75 /// 76 /// The second half of the tuple that is returned is an [`Option`]`<`[`usize`]`>`. 77 /// A [`None`] here means that either there is no known upper bound, or the 78 /// upper bound is larger than [`usize`]. 79 /// 80 /// # Implementation notes 81 /// 82 /// It is not enforced that a stream implementation yields the declared 83 /// number of elements. A buggy stream may yield less than the lower bound 84 /// or more than the upper bound of elements. 85 /// 86 /// `size_hint()` is primarily intended to be used for optimizations such as 87 /// reserving space for the elements of the stream, but must not be 88 /// trusted to e.g., omit bounds checks in unsafe code. An incorrect 89 /// implementation of `size_hint()` should not lead to memory safety 90 /// violations. 91 /// 92 /// That said, the implementation should provide a correct estimation, 93 /// because otherwise it would be a violation of the trait's protocol. 94 /// 95 /// The default implementation returns `(0, `[`None`]`)` which is correct for any 96 /// stream. 97 #[inline] size_hint(&self) -> (usize, Option<usize>)98 fn size_hint(&self) -> (usize, Option<usize>) { 99 (0, None) 100 } 101 } 102 103 impl<S: ?Sized + Stream + Unpin> Stream for &mut S { 104 type Item = S::Item; 105 poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>106 fn poll_next( 107 mut self: Pin<&mut Self>, 108 cx: &mut Context<'_>, 109 ) -> Poll<Option<Self::Item>> { 110 S::poll_next(Pin::new(&mut **self), cx) 111 } 112 size_hint(&self) -> (usize, Option<usize>)113 fn size_hint(&self) -> (usize, Option<usize>) { 114 (**self).size_hint() 115 } 116 } 117 118 impl<P> Stream for Pin<P> 119 where 120 P: DerefMut + Unpin, 121 P::Target: Stream, 122 { 123 type Item = <P::Target as Stream>::Item; 124 poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>125 fn poll_next( 126 self: Pin<&mut Self>, 127 cx: &mut Context<'_>, 128 ) -> Poll<Option<Self::Item>> { 129 self.get_mut().as_mut().poll_next(cx) 130 } 131 size_hint(&self) -> (usize, Option<usize>)132 fn size_hint(&self) -> (usize, Option<usize>) { 133 (**self).size_hint() 134 } 135 } 136 137 /// A stream which tracks whether or not the underlying stream 138 /// should no longer be polled. 139 /// 140 /// `is_terminated` will return `true` if a future should no longer be polled. 141 /// Usually, this state occurs after `poll_next` (or `try_poll_next`) returned 142 /// `Poll::Ready(None)`. However, `is_terminated` may also return `true` if a 143 /// stream has become inactive and can no longer make progress and should be 144 /// ignored or dropped rather than being polled again. 145 pub trait FusedStream: Stream { 146 /// Returns `true` if the stream should no longer be polled. is_terminated(&self) -> bool147 fn is_terminated(&self) -> bool; 148 } 149 150 impl<F: ?Sized + FusedStream + Unpin> FusedStream for &mut F { is_terminated(&self) -> bool151 fn is_terminated(&self) -> bool { 152 <F as FusedStream>::is_terminated(&**self) 153 } 154 } 155 156 impl<P> FusedStream for Pin<P> 157 where 158 P: DerefMut + Unpin, 159 P::Target: FusedStream, 160 { is_terminated(&self) -> bool161 fn is_terminated(&self) -> bool { 162 <P::Target as FusedStream>::is_terminated(&**self) 163 } 164 } 165 166 mod private_try_stream { 167 use super::Stream; 168 169 pub trait Sealed {} 170 171 impl<S, T, E> Sealed for S where S: ?Sized + Stream<Item = Result<T, E>> {} 172 } 173 174 /// A convenience for streams that return `Result` values that includes 175 /// a variety of adapters tailored to such futures. 176 pub trait TryStream: Stream + private_try_stream::Sealed { 177 /// The type of successful values yielded by this future 178 type Ok; 179 180 /// The type of failures yielded by this future 181 type Error; 182 183 /// Poll this `TryStream` as if it were a `Stream`. 184 /// 185 /// This method is a stopgap for a compiler limitation that prevents us from 186 /// directly inheriting from the `Stream` trait; in the future it won't be 187 /// needed. try_poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Self::Ok, Self::Error>>>188 fn try_poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) 189 -> Poll<Option<Result<Self::Ok, Self::Error>>>; 190 } 191 192 impl<S, T, E> TryStream for S 193 where S: ?Sized + Stream<Item = Result<T, E>> 194 { 195 type Ok = T; 196 type Error = E; 197 try_poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<Self::Ok, Self::Error>>>198 fn try_poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) 199 -> Poll<Option<Result<Self::Ok, Self::Error>>> 200 { 201 self.poll_next(cx) 202 } 203 } 204 205 #[cfg(feature = "alloc")] 206 mod if_alloc { 207 use alloc::boxed::Box; 208 use super::*; 209 210 impl<S: ?Sized + Stream + Unpin> Stream for Box<S> { 211 type Item = S::Item; 212 poll_next( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>213 fn poll_next( 214 mut self: Pin<&mut Self>, 215 cx: &mut Context<'_>, 216 ) -> Poll<Option<Self::Item>> { 217 Pin::new(&mut **self).poll_next(cx) 218 } 219 size_hint(&self) -> (usize, Option<usize>)220 fn size_hint(&self) -> (usize, Option<usize>) { 221 (**self).size_hint() 222 } 223 } 224 225 #[cfg(feature = "std")] 226 impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> { 227 type Item = S::Item; 228 poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<S::Item>>229 fn poll_next( 230 self: Pin<&mut Self>, 231 cx: &mut Context<'_>, 232 ) -> Poll<Option<S::Item>> { 233 unsafe { self.map_unchecked_mut(|x| &mut x.0) }.poll_next(cx) 234 } 235 size_hint(&self) -> (usize, Option<usize>)236 fn size_hint(&self) -> (usize, Option<usize>) { 237 self.0.size_hint() 238 } 239 } 240 241 impl<S: ?Sized + FusedStream + Unpin> FusedStream for Box<S> { is_terminated(&self) -> bool242 fn is_terminated(&self) -> bool { 243 <S as FusedStream>::is_terminated(&**self) 244 } 245 } 246 } 247