1 #![cfg(feature = "sync")] 2 3 use tokio::sync::watch; 4 use tokio_stream::wrappers::WatchStream; 5 use tokio_stream::StreamExt; 6 7 #[tokio::test] message_not_twice()8async fn message_not_twice() { 9 let (tx, rx) = watch::channel("hello"); 10 11 let mut counter = 0; 12 let mut stream = WatchStream::new(rx).map(move |payload| { 13 println!("{}", payload); 14 if payload == "goodbye" { 15 counter += 1; 16 } 17 if counter >= 2 { 18 panic!("too many goodbyes"); 19 } 20 }); 21 22 let task = tokio::spawn(async move { while stream.next().await.is_some() {} }); 23 24 // Send goodbye just once 25 tx.send("goodbye").unwrap(); 26 27 drop(tx); 28 task.await.unwrap(); 29 } 30