• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use async_stream::stream;
2 
3 use futures_core::stream::{FusedStream, Stream};
4 use futures_util::pin_mut;
5 use futures_util::stream::StreamExt;
6 use tokio::sync::mpsc;
7 use tokio_test::assert_ok;
8 
9 #[tokio::test]
noop_stream()10 async fn noop_stream() {
11     let s = stream! {};
12     pin_mut!(s);
13 
14     while let Some(_) = s.next().await {
15         unreachable!();
16     }
17 }
18 
19 #[tokio::test]
empty_stream()20 async fn empty_stream() {
21     let mut ran = false;
22 
23     {
24         let r = &mut ran;
25         let s = stream! {
26             *r = true;
27             println!("hello world!");
28         };
29         pin_mut!(s);
30 
31         while let Some(_) = s.next().await {
32             unreachable!();
33         }
34     }
35 
36     assert!(ran);
37 }
38 
39 #[tokio::test]
yield_single_value()40 async fn yield_single_value() {
41     let s = stream! {
42         yield "hello";
43     };
44 
45     let values: Vec<_> = s.collect().await;
46 
47     assert_eq!(1, values.len());
48     assert_eq!("hello", values[0]);
49 }
50 
51 #[tokio::test]
fused()52 async fn fused() {
53     let s = stream! {
54         yield "hello";
55     };
56     pin_mut!(s);
57 
58     assert!(!s.is_terminated());
59     assert_eq!(s.next().await, Some("hello"));
60     assert_eq!(s.next().await, None);
61 
62     assert!(s.is_terminated());
63     // This should return None from now on
64     assert_eq!(s.next().await, None);
65 }
66 
67 #[tokio::test]
yield_multi_value()68 async fn yield_multi_value() {
69     let s = stream! {
70         yield "hello";
71         yield "world";
72         yield "dizzy";
73     };
74 
75     let values: Vec<_> = s.collect().await;
76 
77     assert_eq!(3, values.len());
78     assert_eq!("hello", values[0]);
79     assert_eq!("world", values[1]);
80     assert_eq!("dizzy", values[2]);
81 }
82 
83 #[tokio::test]
return_stream()84 async fn return_stream() {
85     fn build_stream() -> impl Stream<Item = u32> {
86         stream! {
87             yield 1;
88             yield 2;
89             yield 3;
90         }
91     }
92 
93     let s = build_stream();
94 
95     let values: Vec<_> = s.collect().await;
96     assert_eq!(3, values.len());
97     assert_eq!(1, values[0]);
98     assert_eq!(2, values[1]);
99     assert_eq!(3, values[2]);
100 }
101 
102 #[tokio::test]
consume_channel()103 async fn consume_channel() {
104     let (mut tx, mut rx) = mpsc::channel(10);
105 
106     let s = stream! {
107         while let Some(v) = rx.recv().await {
108             yield v;
109         }
110     };
111 
112     pin_mut!(s);
113 
114     for i in 0..3 {
115         assert_ok!(tx.send(i).await);
116         assert_eq!(Some(i), s.next().await);
117     }
118 
119     drop(tx);
120     assert_eq!(None, s.next().await);
121 }
122 
123 #[tokio::test]
borrow_self()124 async fn borrow_self() {
125     struct Data(String);
126 
127     impl Data {
128         fn stream<'a>(&'a self) -> impl Stream<Item = &str> + 'a {
129             stream! {
130                 yield &self.0[..];
131             }
132         }
133     }
134 
135     let data = Data("hello".to_string());
136     let s = data.stream();
137     pin_mut!(s);
138 
139     assert_eq!(Some("hello"), s.next().await);
140 }
141 
142 #[tokio::test]
stream_in_stream()143 async fn stream_in_stream() {
144     let s = stream! {
145         let s = stream! {
146             for i in 0..3 {
147                 yield i;
148             }
149         };
150 
151         pin_mut!(s);
152         while let Some(v) = s.next().await {
153             yield v;
154         }
155     };
156 
157     let values: Vec<_> = s.collect().await;
158     assert_eq!(3, values.len());
159 }
160 
161 #[test]
test()162 fn test() {
163     let t = trybuild::TestCases::new();
164     t.compile_fail("tests/ui/*.rs");
165 }
166