1 use crate::fns::FnOnce1; 2 use crate::stream::{Fuse, StreamExt}; 3 use core::fmt; 4 use core::marker::PhantomData; 5 use core::pin::Pin; 6 use futures_core::future::{FusedFuture, Future}; 7 use futures_core::ready; 8 use futures_core::stream::{FusedStream, Stream}; 9 use futures_core::task::{Context, Poll}; 10 #[cfg(feature = "sink")] 11 use futures_sink::Sink; 12 use pin_project_lite::pin_project; 13 14 pin_project! { 15 /// A `Stream` that implements a `peek` method. 16 /// 17 /// The `peek` method can be used to retrieve a reference 18 /// to the next `Stream::Item` if available. A subsequent 19 /// call to `poll` will return the owned item. 20 #[derive(Debug)] 21 #[must_use = "streams do nothing unless polled"] 22 pub struct Peekable<St: Stream> { 23 #[pin] 24 stream: Fuse<St>, 25 peeked: Option<St::Item>, 26 } 27 } 28 29 impl<St: Stream> Peekable<St> { new(stream: St) -> Self30 pub(super) fn new(stream: St) -> Self { 31 Self { stream: stream.fuse(), peeked: None } 32 } 33 34 delegate_access_inner!(stream, St, (.)); 35 36 /// Produces a future which retrieves a reference to the next item 37 /// in the stream, or `None` if the underlying stream terminates. peek(self: Pin<&mut Self>) -> Peek<'_, St>38 pub fn peek(self: Pin<&mut Self>) -> Peek<'_, St> { 39 Peek { inner: Some(self) } 40 } 41 42 /// Peek retrieves a reference to the next item in the stream. 43 /// 44 /// This method polls the underlying stream and return either a reference 45 /// to the next item if the stream is ready or passes through any errors. poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&St::Item>>46 pub fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<&St::Item>> { 47 let mut this = self.project(); 48 49 Poll::Ready(loop { 50 if this.peeked.is_some() { 51 break this.peeked.as_ref(); 52 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { 53 *this.peeked = Some(item); 54 } else { 55 break None; 56 } 57 }) 58 } 59 60 /// Produces a future which retrieves a mutable reference to the next item 61 /// in the stream, or `None` if the underlying stream terminates. 62 /// 63 /// # Examples 64 /// 65 /// ``` 66 /// # futures::executor::block_on(async { 67 /// use futures::stream::{self, StreamExt}; 68 /// use futures::pin_mut; 69 /// 70 /// let stream = stream::iter(vec![1, 2, 3]).peekable(); 71 /// pin_mut!(stream); 72 /// 73 /// assert_eq!(stream.as_mut().peek_mut().await, Some(&mut 1)); 74 /// assert_eq!(stream.as_mut().next().await, Some(1)); 75 /// 76 /// // Peek into the stream and modify the value which will be returned next 77 /// if let Some(p) = stream.as_mut().peek_mut().await { 78 /// if *p == 2 { 79 /// *p = 5; 80 /// } 81 /// } 82 /// 83 /// assert_eq!(stream.collect::<Vec<_>>().await, vec![5, 3]); 84 /// # }); 85 /// ``` peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St>86 pub fn peek_mut(self: Pin<&mut Self>) -> PeekMut<'_, St> { 87 PeekMut { inner: Some(self) } 88 } 89 90 /// Peek retrieves a mutable reference to the next item in the stream. poll_peek_mut( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<&mut St::Item>>91 pub fn poll_peek_mut( 92 self: Pin<&mut Self>, 93 cx: &mut Context<'_>, 94 ) -> Poll<Option<&mut St::Item>> { 95 let mut this = self.project(); 96 97 Poll::Ready(loop { 98 if this.peeked.is_some() { 99 break this.peeked.as_mut(); 100 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) { 101 *this.peeked = Some(item); 102 } else { 103 break None; 104 } 105 }) 106 } 107 108 /// Creates a future which will consume and return the next value of this 109 /// stream if a condition is true. 110 /// 111 /// If `func` returns `true` for the next value of this stream, consume and 112 /// return it. Otherwise, return `None`. 113 /// 114 /// # Examples 115 /// 116 /// Consume a number if it's equal to 0. 117 /// 118 /// ``` 119 /// # futures::executor::block_on(async { 120 /// use futures::stream::{self, StreamExt}; 121 /// use futures::pin_mut; 122 /// 123 /// let stream = stream::iter(0..5).peekable(); 124 /// pin_mut!(stream); 125 /// // The first item of the stream is 0; consume it. 126 /// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, Some(0)); 127 /// // The next item returned is now 1, so `consume` will return `false`. 128 /// assert_eq!(stream.as_mut().next_if(|&x| x == 0).await, None); 129 /// // `next_if` saves the value of the next item if it was not equal to `expected`. 130 /// assert_eq!(stream.next().await, Some(1)); 131 /// # }); 132 /// ``` 133 /// 134 /// Consume any number less than 10. 135 /// 136 /// ``` 137 /// # futures::executor::block_on(async { 138 /// use futures::stream::{self, StreamExt}; 139 /// use futures::pin_mut; 140 /// 141 /// let stream = stream::iter(1..20).peekable(); 142 /// pin_mut!(stream); 143 /// // Consume all numbers less than 10 144 /// while stream.as_mut().next_if(|&x| x < 10).await.is_some() {} 145 /// // The next value returned will be 10 146 /// assert_eq!(stream.next().await, Some(10)); 147 /// # }); 148 /// ``` next_if<F>(self: Pin<&mut Self>, func: F) -> NextIf<'_, St, F> where F: FnOnce(&St::Item) -> bool,149 pub fn next_if<F>(self: Pin<&mut Self>, func: F) -> NextIf<'_, St, F> 150 where 151 F: FnOnce(&St::Item) -> bool, 152 { 153 NextIf { inner: Some((self, func)) } 154 } 155 156 /// Creates a future which will consume and return the next item if it is 157 /// equal to `expected`. 158 /// 159 /// # Example 160 /// 161 /// Consume a number if it's equal to 0. 162 /// 163 /// ``` 164 /// # futures::executor::block_on(async { 165 /// use futures::stream::{self, StreamExt}; 166 /// use futures::pin_mut; 167 /// 168 /// let stream = stream::iter(0..5).peekable(); 169 /// pin_mut!(stream); 170 /// // The first item of the stream is 0; consume it. 171 /// assert_eq!(stream.as_mut().next_if_eq(&0).await, Some(0)); 172 /// // The next item returned is now 1, so `consume` will return `false`. 173 /// assert_eq!(stream.as_mut().next_if_eq(&0).await, None); 174 /// // `next_if_eq` saves the value of the next item if it was not equal to `expected`. 175 /// assert_eq!(stream.next().await, Some(1)); 176 /// # }); 177 /// ``` next_if_eq<'a, T>(self: Pin<&'a mut Self>, expected: &'a T) -> NextIfEq<'a, St, T> where T: ?Sized, St::Item: PartialEq<T>,178 pub fn next_if_eq<'a, T>(self: Pin<&'a mut Self>, expected: &'a T) -> NextIfEq<'a, St, T> 179 where 180 T: ?Sized, 181 St::Item: PartialEq<T>, 182 { 183 NextIfEq { 184 inner: NextIf { inner: Some((self, NextIfEqFn { expected, _next: PhantomData })) }, 185 } 186 } 187 } 188 189 impl<St: Stream> FusedStream for Peekable<St> { is_terminated(&self) -> bool190 fn is_terminated(&self) -> bool { 191 self.peeked.is_none() && self.stream.is_terminated() 192 } 193 } 194 195 impl<S: Stream> Stream for Peekable<S> { 196 type Item = S::Item; 197 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>198 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 199 let this = self.project(); 200 if let Some(item) = this.peeked.take() { 201 return Poll::Ready(Some(item)); 202 } 203 this.stream.poll_next(cx) 204 } 205 size_hint(&self) -> (usize, Option<usize>)206 fn size_hint(&self) -> (usize, Option<usize>) { 207 let peek_len = usize::from(self.peeked.is_some()); 208 let (lower, upper) = self.stream.size_hint(); 209 let lower = lower.saturating_add(peek_len); 210 let upper = match upper { 211 Some(x) => x.checked_add(peek_len), 212 None => None, 213 }; 214 (lower, upper) 215 } 216 } 217 218 // Forwarding impl of Sink from the underlying stream 219 #[cfg(feature = "sink")] 220 impl<S, Item> Sink<Item> for Peekable<S> 221 where 222 S: Sink<Item> + Stream, 223 { 224 type Error = S::Error; 225 226 delegate_sink!(stream, Item); 227 } 228 229 pin_project! { 230 /// Future for the [`Peekable::peek`](self::Peekable::peek) method. 231 #[must_use = "futures do nothing unless polled"] 232 pub struct Peek<'a, St: Stream> { 233 inner: Option<Pin<&'a mut Peekable<St>>>, 234 } 235 } 236 237 impl<St> fmt::Debug for Peek<'_, St> 238 where 239 St: Stream + fmt::Debug, 240 St::Item: fmt::Debug, 241 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result242 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 243 f.debug_struct("Peek").field("inner", &self.inner).finish() 244 } 245 } 246 247 impl<St: Stream> FusedFuture for Peek<'_, St> { is_terminated(&self) -> bool248 fn is_terminated(&self) -> bool { 249 self.inner.is_none() 250 } 251 } 252 253 impl<'a, St> Future for Peek<'a, St> 254 where 255 St: Stream, 256 { 257 type Output = Option<&'a St::Item>; 258 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>259 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 260 let inner = self.project().inner; 261 if let Some(peekable) = inner { 262 ready!(peekable.as_mut().poll_peek(cx)); 263 264 inner.take().unwrap().poll_peek(cx) 265 } else { 266 panic!("Peek polled after completion") 267 } 268 } 269 } 270 271 pin_project! { 272 /// Future for the [`Peekable::peek_mut`](self::Peekable::peek_mut) method. 273 #[must_use = "futures do nothing unless polled"] 274 pub struct PeekMut<'a, St: Stream> { 275 inner: Option<Pin<&'a mut Peekable<St>>>, 276 } 277 } 278 279 impl<St> fmt::Debug for PeekMut<'_, St> 280 where 281 St: Stream + fmt::Debug, 282 St::Item: fmt::Debug, 283 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result284 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 285 f.debug_struct("PeekMut").field("inner", &self.inner).finish() 286 } 287 } 288 289 impl<St: Stream> FusedFuture for PeekMut<'_, St> { is_terminated(&self) -> bool290 fn is_terminated(&self) -> bool { 291 self.inner.is_none() 292 } 293 } 294 295 impl<'a, St> Future for PeekMut<'a, St> 296 where 297 St: Stream, 298 { 299 type Output = Option<&'a mut St::Item>; 300 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>301 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 302 let inner = self.project().inner; 303 if let Some(peekable) = inner { 304 ready!(peekable.as_mut().poll_peek_mut(cx)); 305 306 inner.take().unwrap().poll_peek_mut(cx) 307 } else { 308 panic!("PeekMut polled after completion") 309 } 310 } 311 } 312 313 pin_project! { 314 /// Future for the [`Peekable::next_if`](self::Peekable::next_if) method. 315 #[must_use = "futures do nothing unless polled"] 316 pub struct NextIf<'a, St: Stream, F> { 317 inner: Option<(Pin<&'a mut Peekable<St>>, F)>, 318 } 319 } 320 321 impl<St, F> fmt::Debug for NextIf<'_, St, F> 322 where 323 St: Stream + fmt::Debug, 324 St::Item: fmt::Debug, 325 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result326 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 327 f.debug_struct("NextIf").field("inner", &self.inner.as_ref().map(|(s, _f)| s)).finish() 328 } 329 } 330 331 #[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 332 impl<St, F> FusedFuture for NextIf<'_, St, F> 333 where 334 St: Stream, 335 F: for<'a> FnOnce1<&'a St::Item, Output = bool>, 336 { is_terminated(&self) -> bool337 fn is_terminated(&self) -> bool { 338 self.inner.is_none() 339 } 340 } 341 342 #[allow(single_use_lifetimes)] // https://github.com/rust-lang/rust/issues/55058 343 impl<St, F> Future for NextIf<'_, St, F> 344 where 345 St: Stream, 346 F: for<'a> FnOnce1<&'a St::Item, Output = bool>, 347 { 348 type Output = Option<St::Item>; 349 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>350 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 351 let inner = self.project().inner; 352 if let Some((peekable, _)) = inner { 353 let res = ready!(peekable.as_mut().poll_next(cx)); 354 355 let (peekable, func) = inner.take().unwrap(); 356 match res { 357 Some(ref matched) if func.call_once(matched) => Poll::Ready(res), 358 other => { 359 let peekable = peekable.project(); 360 // Since we called `self.next()`, we consumed `self.peeked`. 361 assert!(peekable.peeked.is_none()); 362 *peekable.peeked = other; 363 Poll::Ready(None) 364 } 365 } 366 } else { 367 panic!("NextIf polled after completion") 368 } 369 } 370 } 371 372 pin_project! { 373 /// Future for the [`Peekable::next_if_eq`](self::Peekable::next_if_eq) method. 374 #[must_use = "futures do nothing unless polled"] 375 pub struct NextIfEq<'a, St: Stream, T: ?Sized> { 376 #[pin] 377 inner: NextIf<'a, St, NextIfEqFn<'a, T, St::Item>>, 378 } 379 } 380 381 impl<St, T> fmt::Debug for NextIfEq<'_, St, T> 382 where 383 St: Stream + fmt::Debug, 384 St::Item: fmt::Debug, 385 T: ?Sized, 386 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result387 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 388 f.debug_struct("NextIfEq") 389 .field("inner", &self.inner.inner.as_ref().map(|(s, _f)| s)) 390 .finish() 391 } 392 } 393 394 impl<St, T> FusedFuture for NextIfEq<'_, St, T> 395 where 396 St: Stream, 397 T: ?Sized, 398 St::Item: PartialEq<T>, 399 { is_terminated(&self) -> bool400 fn is_terminated(&self) -> bool { 401 self.inner.is_terminated() 402 } 403 } 404 405 impl<St, T> Future for NextIfEq<'_, St, T> 406 where 407 St: Stream, 408 T: ?Sized, 409 St::Item: PartialEq<T>, 410 { 411 type Output = Option<St::Item>; 412 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>413 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 414 self.project().inner.poll(cx) 415 } 416 } 417 418 struct NextIfEqFn<'a, T: ?Sized, Item> { 419 expected: &'a T, 420 _next: PhantomData<Item>, 421 } 422 423 impl<T, Item> FnOnce1<&Item> for NextIfEqFn<'_, T, Item> 424 where 425 T: ?Sized, 426 Item: PartialEq<T>, 427 { 428 type Output = bool; 429 call_once(self, next: &Item) -> Self::Output430 fn call_once(self, next: &Item) -> Self::Output { 431 next == self.expected 432 } 433 } 434