1 use crate::Stream; 2 3 use core::future::Future; 4 use core::marker::PhantomPinned; 5 use core::pin::Pin; 6 use core::task::{Context, Poll}; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Future for the [`any`](super::StreamExt::any) method. 11 #[derive(Debug)] 12 #[must_use = "futures do nothing unless you `.await` or poll them"] 13 pub struct AnyFuture<'a, St: ?Sized, F> { 14 stream: &'a mut St, 15 f: F, 16 // Make this future `!Unpin` for compatibility with async trait methods. 17 #[pin] 18 _pin: PhantomPinned, 19 } 20 } 21 22 impl<'a, St: ?Sized, F> AnyFuture<'a, St, F> { new(stream: &'a mut St, f: F) -> Self23 pub(super) fn new(stream: &'a mut St, f: F) -> Self { 24 Self { 25 stream, 26 f, 27 _pin: PhantomPinned, 28 } 29 } 30 } 31 32 impl<St, F> Future for AnyFuture<'_, St, F> 33 where 34 St: ?Sized + Stream + Unpin, 35 F: FnMut(St::Item) -> bool, 36 { 37 type Output = bool; 38 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>39 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 40 let me = self.project(); 41 let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx)); 42 43 match next { 44 Some(v) => { 45 if (me.f)(v) { 46 Poll::Ready(true) 47 } else { 48 cx.waker().wake_by_ref(); 49 Poll::Pending 50 } 51 } 52 None => Poll::Ready(false), 53 } 54 } 55 } 56