1 use async_stream::stream;
2
3 use futures_core::stream::{FusedStream, Stream};
4 use futures_util::pin_mut;
5 use futures_util::stream::StreamExt;
6 use tokio::sync::mpsc;
7 use tokio_test::assert_ok;
8
9 #[tokio::test]
noop_stream()10 async fn noop_stream() {
11 let s = stream! {};
12 pin_mut!(s);
13
14 while let Some(_) = s.next().await {
15 unreachable!();
16 }
17 }
18
19 #[tokio::test]
empty_stream()20 async fn empty_stream() {
21 let mut ran = false;
22
23 {
24 let r = &mut ran;
25 let s = stream! {
26 *r = true;
27 println!("hello world!");
28 };
29 pin_mut!(s);
30
31 while let Some(_) = s.next().await {
32 unreachable!();
33 }
34 }
35
36 assert!(ran);
37 }
38
39 #[tokio::test]
yield_single_value()40 async fn yield_single_value() {
41 let s = stream! {
42 yield "hello";
43 };
44
45 let values: Vec<_> = s.collect().await;
46
47 assert_eq!(1, values.len());
48 assert_eq!("hello", values[0]);
49 }
50
51 #[tokio::test]
fused()52 async fn fused() {
53 let s = stream! {
54 yield "hello";
55 };
56 pin_mut!(s);
57
58 assert!(!s.is_terminated());
59 assert_eq!(s.next().await, Some("hello"));
60 assert_eq!(s.next().await, None);
61
62 assert!(s.is_terminated());
63 // This should return None from now on
64 assert_eq!(s.next().await, None);
65 }
66
67 #[tokio::test]
yield_multi_value()68 async fn yield_multi_value() {
69 let s = stream! {
70 yield "hello";
71 yield "world";
72 yield "dizzy";
73 };
74
75 let values: Vec<_> = s.collect().await;
76
77 assert_eq!(3, values.len());
78 assert_eq!("hello", values[0]);
79 assert_eq!("world", values[1]);
80 assert_eq!("dizzy", values[2]);
81 }
82
83 #[tokio::test]
return_stream()84 async fn return_stream() {
85 fn build_stream() -> impl Stream<Item = u32> {
86 stream! {
87 yield 1;
88 yield 2;
89 yield 3;
90 }
91 }
92
93 let s = build_stream();
94
95 let values: Vec<_> = s.collect().await;
96 assert_eq!(3, values.len());
97 assert_eq!(1, values[0]);
98 assert_eq!(2, values[1]);
99 assert_eq!(3, values[2]);
100 }
101
102 #[tokio::test]
consume_channel()103 async fn consume_channel() {
104 let (mut tx, mut rx) = mpsc::channel(10);
105
106 let s = stream! {
107 while let Some(v) = rx.recv().await {
108 yield v;
109 }
110 };
111
112 pin_mut!(s);
113
114 for i in 0..3 {
115 assert_ok!(tx.send(i).await);
116 assert_eq!(Some(i), s.next().await);
117 }
118
119 drop(tx);
120 assert_eq!(None, s.next().await);
121 }
122
123 #[tokio::test]
borrow_self()124 async fn borrow_self() {
125 struct Data(String);
126
127 impl Data {
128 fn stream<'a>(&'a self) -> impl Stream<Item = &str> + 'a {
129 stream! {
130 yield &self.0[..];
131 }
132 }
133 }
134
135 let data = Data("hello".to_string());
136 let s = data.stream();
137 pin_mut!(s);
138
139 assert_eq!(Some("hello"), s.next().await);
140 }
141
142 #[tokio::test]
stream_in_stream()143 async fn stream_in_stream() {
144 let s = stream! {
145 let s = stream! {
146 for i in 0..3 {
147 yield i;
148 }
149 };
150
151 pin_mut!(s);
152 while let Some(v) = s.next().await {
153 yield v;
154 }
155 };
156
157 let values: Vec<_> = s.collect().await;
158 assert_eq!(3, values.len());
159 }
160
161 #[test]
test()162 fn test() {
163 let t = trybuild::TestCases::new();
164 t.compile_fail("tests/ui/*.rs");
165 }
166