1 use std::pin::Pin;
2 use tokio::sync::watch::Receiver;
3
4 use futures_core::Stream;
5 use tokio_util::sync::ReusableBoxFuture;
6
7 use std::fmt;
8 use std::task::{Context, Poll};
9 use tokio::sync::watch::error::RecvError;
10
11 /// A wrapper around [`tokio::sync::watch::Receiver`] that implements [`Stream`].
12 ///
13 /// This stream will always start by yielding the current value when the WatchStream is polled,
14 /// regardles of whether it was the initial value or sent afterwards.
15 ///
16 /// # Examples
17 ///
18 /// ```
19 /// # #[tokio::main]
20 /// # async fn main() {
21 /// use tokio_stream::{StreamExt, wrappers::WatchStream};
22 /// use tokio::sync::watch;
23 ///
24 /// let (tx, rx) = watch::channel("hello");
25 /// let mut rx = WatchStream::new(rx);
26 ///
27 /// assert_eq!(rx.next().await, Some("hello"));
28 ///
29 /// tx.send("goodbye").unwrap();
30 /// assert_eq!(rx.next().await, Some("goodbye"));
31 /// # }
32 /// ```
33 ///
34 /// ```
35 /// # #[tokio::main]
36 /// # async fn main() {
37 /// use tokio_stream::{StreamExt, wrappers::WatchStream};
38 /// use tokio::sync::watch;
39 ///
40 /// let (tx, rx) = watch::channel("hello");
41 /// let mut rx = WatchStream::new(rx);
42 ///
43 /// tx.send("goodbye").unwrap();
44 /// assert_eq!(rx.next().await, Some("goodbye"));
45 /// # }
46 /// ```
47 ///
48 /// [`tokio::sync::watch::Receiver`]: struct@tokio::sync::watch::Receiver
49 /// [`Stream`]: trait@crate::Stream
50 #[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
51 pub struct WatchStream<T> {
52 inner: ReusableBoxFuture<(Result<(), RecvError>, Receiver<T>)>,
53 }
54
make_future<T: Clone + Send + Sync>( mut rx: Receiver<T>, ) -> (Result<(), RecvError>, Receiver<T>)55 async fn make_future<T: Clone + Send + Sync>(
56 mut rx: Receiver<T>,
57 ) -> (Result<(), RecvError>, Receiver<T>) {
58 let result = rx.changed().await;
59 (result, rx)
60 }
61
62 impl<T: 'static + Clone + Unpin + Send + Sync> WatchStream<T> {
63 /// Create a new `WatchStream`.
new(rx: Receiver<T>) -> Self64 pub fn new(rx: Receiver<T>) -> Self {
65 Self {
66 inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
67 }
68 }
69 }
70
71 impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
72 type Item = T;
73
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>74 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
75 let (result, rx) = ready!(self.inner.poll(cx));
76 match result {
77 Ok(_) => {
78 let received = (*rx.borrow()).clone();
79 self.inner.set(make_future(rx));
80 Poll::Ready(Some(received))
81 }
82 Err(_) => {
83 self.inner.set(make_future(rx));
84 Poll::Ready(None)
85 }
86 }
87 }
88 }
89
90 impl<T> Unpin for WatchStream<T> {}
91
92 impl<T> fmt::Debug for WatchStream<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result93 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
94 f.debug_struct("WatchStream").finish()
95 }
96 }
97