• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::pin::Pin;
2 use tokio::sync::watch::Receiver;
3 
4 use futures_core::Stream;
5 use tokio_util::sync::ReusableBoxFuture;
6 
7 use std::fmt;
8 use std::task::{Context, Poll};
9 use tokio::sync::watch::error::RecvError;
10 
11 /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
12 ///
13 /// This stream will always start by yielding the current value when the WatchStream is polled,
14 /// regardles of whether it was the initial value or sent afterwards.
15 ///
16 /// # Examples
17 ///
18 /// ```
19 /// # #[tokio::main]
20 /// # async fn main() {
21 /// use tokio_stream::{StreamExt, wrappers::WatchStream};
22 /// use tokio::sync::watch;
23 ///
24 /// let (tx, rx) = watch::channel("hello");
25 /// let mut rx = WatchStream::new(rx);
26 ///
27 /// assert_eq!(rx.next().await, Some("hello"));
28 ///
29 /// tx.send("goodbye").unwrap();
30 /// assert_eq!(rx.next().await, Some("goodbye"));
31 /// # }
32 /// ```
33 ///
34 /// ```
35 /// # #[tokio::main]
36 /// # async fn main() {
37 /// use tokio_stream::{StreamExt, wrappers::WatchStream};
38 /// use tokio::sync::watch;
39 ///
40 /// let (tx, rx) = watch::channel("hello");
41 /// let mut rx = WatchStream::new(rx);
42 ///
43 /// tx.send("goodbye").unwrap();
44 /// assert_eq!(rx.next().await, Some("goodbye"));
45 /// # }
46 /// ```
47 ///
48 /// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver
49 /// [`Stream`]: trait@crate::Stream
50 #[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
51 pub struct WatchStream<T> {
52     inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>,
53 }
54 
make_future<T: Clone + Send + Sync>( mut rx: Receiver<T>, ) -> (Result<(), RecvError>, Receiver<T>)55 async fn make_future<T: Clone + Send + Sync>(
56     mut rx: Receiver<T>,
57 ) -> (Result<(), RecvError>, Receiver<T>) {
58     let result = rx.changed().await;
59     (result, rx)
60 }
61 
62 impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> {
63     /// Create a new `WatchStream`.
new(rx: Receiver<T>) -> Self64     pub fn new(rx: Receiver<T>) -> Self {
65         Self {
66             inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
67         }
68     }
69 }
70 
71 impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
72     type Item = T;
73 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>74     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
75         let (result, rx) = ready!(self.inner.poll(cx));
76         match result {
77             Ok(_) => {
78                 let received = (*rx.borrow()).clone();
79                 self.inner.set(make_future(rx));
80                 Poll::Ready(Some(received))
81             }
82             Err(_) => {
83                 self.inner.set(make_future(rx));
84                 Poll::Ready(None)
85             }
86         }
87     }
88 }
89 
90 impl<T> Unpin for WatchStream<T> {}
91 
92 impl<T> fmt::Debug for WatchStream<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result93     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94         f.debug_struct("WatchStream").finish()
95     }
96 }
97