1 use core::pin::Pin; 2 use futures_core::ready; 3 use futures_core::stream::FusedStream; 4 use futures_core::future::{Future, FusedFuture}; 5 use futures_core::task::{Context, Poll}; 6 use crate::stream::StreamExt; 7 8 /// Future for the [`select_next_some`](super::StreamExt::select_next_some) 9 /// method. 10 #[derive(Debug)] 11 #[must_use = "futures do nothing unless you `.await` or poll them"] 12 pub struct SelectNextSome<'a, St: ?Sized> { 13 stream: &'a mut St, 14 } 15 16 impl<'a, St: ?Sized> SelectNextSome<'a, St> { new(stream: &'a mut St) -> Self17 pub(super) fn new(stream: &'a mut St) -> Self { 18 Self { stream } 19 } 20 } 21 22 impl<St: ?Sized + FusedStream + Unpin> FusedFuture for SelectNextSome<'_, St> { is_terminated(&self) -> bool23 fn is_terminated(&self) -> bool { 24 self.stream.is_terminated() 25 } 26 } 27 28 impl<St: ?Sized + FusedStream + Unpin> Future for SelectNextSome<'_, St> { 29 type Output = St::Item; 30 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>31 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 32 assert!(!self.stream.is_terminated(), "SelectNextSome polled after terminated"); 33 34 if let Some(item) = ready!(self.stream.poll_next_unpin(cx)) { 35 Poll::Ready(item) 36 } else { 37 debug_assert!(self.stream.is_terminated()); 38 cx.waker().wake_by_ref(); 39 Poll::Pending 40 } 41 } 42 } 43