1 use crate::Stream; 2 use std::pin::Pin; 3 use std::task::{Context, Poll}; 4 use tokio::sync::mpsc::Receiver; 5 6 /// A wrapper around [`tokio::sync::mpsc::Receiver`] that implements [`Stream`]. 7 /// 8 /// [`tokio::sync::mpsc::Receiver`]: struct@tokio::sync::mpsc::Receiver 9 /// [`Stream`]: trait@crate::Stream 10 #[derive(Debug)] 11 pub struct ReceiverStream<T> { 12 inner: Receiver<T>, 13 } 14 15 impl<T> ReceiverStream<T> { 16 /// Create a new `ReceiverStream`. new(recv: Receiver<T>) -> Self17 pub fn new(recv: Receiver<T>) -> Self { 18 Self { inner: recv } 19 } 20 21 /// Get back the inner `Receiver`. into_inner(self) -> Receiver<T>22 pub fn into_inner(self) -> Receiver<T> { 23 self.inner 24 } 25 26 /// Closes the receiving half of a channel without dropping it. 27 /// 28 /// This prevents any further messages from being sent on the channel while 29 /// still enabling the receiver to drain messages that are buffered. Any 30 /// outstanding [`Permit`] values will still be able to send messages. 31 /// 32 /// To guarantee no messages are dropped, after calling `close()`, you must 33 /// receive all items from the stream until `None` is returned. 34 /// 35 /// [`Permit`]: struct@tokio::sync::mpsc::Permit close(&mut self)36 pub fn close(&mut self) { 37 self.inner.close() 38 } 39 } 40 41 impl<T> Stream for ReceiverStream<T> { 42 type Item = T; 43 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>44 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 45 self.inner.poll_recv(cx) 46 } 47 } 48 49 impl<T> AsRef<Receiver<T>> for ReceiverStream<T> { as_ref(&self) -> &Receiver<T>50 fn as_ref(&self) -> &Receiver<T> { 51 &self.inner 52 } 53 } 54 55 impl<T> AsMut<Receiver<T>> for ReceiverStream<T> { as_mut(&mut self) -> &mut Receiver<T>56 fn as_mut(&mut self) -> &mut Receiver<T> { 57 &mut self.inner 58 } 59 } 60