• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::Stream;
2 
3 use core::future::Future;
4 use core::marker::PhantomPinned;
5 use core::pin::Pin;
6 use core::task::{Context, Poll};
7 use pin_project_lite::pin_project;
8 
9 pin_project! {
10     /// Future for the [`any`](super::StreamExt::any) method.
11     #[derive(Debug)]
12     #[must_use = "futures do nothing unless you `.await` or poll them"]
13     pub struct AnyFuture<'a, St: ?Sized, F> {
14         stream: &'a mut St,
15         f: F,
16         // Make this future `!Unpin` for compatibility with async trait methods.
17         #[pin]
18         _pin: PhantomPinned,
19     }
20 }
21 
22 impl<'a, St: ?Sized, F> AnyFuture<'a, St, F> {
new(stream: &'a mut St, f: F) -> Self23     pub(super) fn new(stream: &'a mut St, f: F) -> Self {
24         Self {
25             stream,
26             f,
27             _pin: PhantomPinned,
28         }
29     }
30 }
31 
32 impl<St, F> Future for AnyFuture<'_, St, F>
33 where
34     St: ?Sized + Stream + Unpin,
35     F: FnMut(St::Item) -> bool,
36 {
37     type Output = bool;
38 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>39     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
40         let me = self.project();
41         let next = futures_core::ready!(Pin::new(me.stream).poll_next(cx));
42 
43         match next {
44             Some(v) => {
45                 if (me.f)(v) {
46                     Poll::Ready(true)
47                 } else {
48                     cx.waker().wake_by_ref();
49                     Poll::Pending
50                 }
51             }
52             None => Poll::Ready(false),
53         }
54     }
55 }
56