• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use tokio_stream::{self as stream, pending, Stream, StreamExt, StreamMap};
2 use tokio_test::{assert_ok, assert_pending, assert_ready, task};
3 
4 mod support {
5     pub(crate) mod mpsc;
6 }
7 
8 use support::mpsc;
9 
10 use std::pin::Pin;
11 
12 macro_rules! assert_ready_some {
13     ($($t:tt)*) => {
14         match assert_ready!($($t)*) {
15             Some(v) => v,
16             None => panic!("expected `Some`, got `None`"),
17         }
18     };
19 }
20 
21 macro_rules! assert_ready_none {
22     ($($t:tt)*) => {
23         match assert_ready!($($t)*) {
24             None => {}
25             Some(v) => panic!("expected `None`, got `Some({:?})`", v),
26         }
27     };
28 }
29 
30 #[tokio::test]
empty()31 async fn empty() {
32     let mut map = StreamMap::<&str, stream::Pending<()>>::new();
33 
34     assert_eq!(map.len(), 0);
35     assert!(map.is_empty());
36 
37     assert!(map.next().await.is_none());
38     assert!(map.next().await.is_none());
39 
40     assert!(map.remove("foo").is_none());
41 }
42 
43 #[tokio::test]
single_entry()44 async fn single_entry() {
45     let mut map = task::spawn(StreamMap::new());
46     let (tx, rx) = mpsc::unbounded_channel_stream();
47     let rx = Box::pin(rx);
48 
49     assert_ready_none!(map.poll_next());
50 
51     assert!(map.insert("foo", rx).is_none());
52     assert!(map.contains_key("foo"));
53     assert!(!map.contains_key("bar"));
54 
55     assert_eq!(map.len(), 1);
56     assert!(!map.is_empty());
57 
58     assert_pending!(map.poll_next());
59 
60     assert_ok!(tx.send(1));
61 
62     assert!(map.is_woken());
63     let (k, v) = assert_ready_some!(map.poll_next());
64     assert_eq!(k, "foo");
65     assert_eq!(v, 1);
66 
67     assert_pending!(map.poll_next());
68 
69     assert_ok!(tx.send(2));
70 
71     assert!(map.is_woken());
72     let (k, v) = assert_ready_some!(map.poll_next());
73     assert_eq!(k, "foo");
74     assert_eq!(v, 2);
75 
76     assert_pending!(map.poll_next());
77     drop(tx);
78     assert!(map.is_woken());
79     assert_ready_none!(map.poll_next());
80 }
81 
82 #[tokio::test]
multiple_entries()83 async fn multiple_entries() {
84     let mut map = task::spawn(StreamMap::new());
85     let (tx1, rx1) = mpsc::unbounded_channel_stream();
86     let (tx2, rx2) = mpsc::unbounded_channel_stream();
87 
88     let rx1 = Box::pin(rx1);
89     let rx2 = Box::pin(rx2);
90 
91     map.insert("foo", rx1);
92     map.insert("bar", rx2);
93 
94     assert_pending!(map.poll_next());
95 
96     assert_ok!(tx1.send(1));
97 
98     assert!(map.is_woken());
99     let (k, v) = assert_ready_some!(map.poll_next());
100     assert_eq!(k, "foo");
101     assert_eq!(v, 1);
102 
103     assert_pending!(map.poll_next());
104 
105     assert_ok!(tx2.send(2));
106 
107     assert!(map.is_woken());
108     let (k, v) = assert_ready_some!(map.poll_next());
109     assert_eq!(k, "bar");
110     assert_eq!(v, 2);
111 
112     assert_pending!(map.poll_next());
113 
114     assert_ok!(tx1.send(3));
115     assert_ok!(tx2.send(4));
116 
117     assert!(map.is_woken());
118 
119     // Given the randomization, there is no guarantee what order the values will
120     // be received in.
121     let mut v = (0..2)
122         .map(|_| assert_ready_some!(map.poll_next()))
123         .collect::<Vec<_>>();
124 
125     assert_pending!(map.poll_next());
126 
127     v.sort_unstable();
128     assert_eq!(v[0].0, "bar");
129     assert_eq!(v[0].1, 4);
130     assert_eq!(v[1].0, "foo");
131     assert_eq!(v[1].1, 3);
132 
133     drop(tx1);
134     assert!(map.is_woken());
135     assert_pending!(map.poll_next());
136     drop(tx2);
137 
138     assert_ready_none!(map.poll_next());
139 }
140 
141 #[tokio::test]
insert_remove()142 async fn insert_remove() {
143     let mut map = task::spawn(StreamMap::new());
144     let (tx, rx) = mpsc::unbounded_channel_stream();
145 
146     let rx = Box::pin(rx);
147 
148     assert_ready_none!(map.poll_next());
149 
150     assert!(map.insert("foo", rx).is_none());
151     let rx = map.remove("foo").unwrap();
152 
153     assert_ok!(tx.send(1));
154 
155     assert!(!map.is_woken());
156     assert_ready_none!(map.poll_next());
157 
158     assert!(map.insert("bar", rx).is_none());
159 
160     let v = assert_ready_some!(map.poll_next());
161     assert_eq!(v.0, "bar");
162     assert_eq!(v.1, 1);
163 
164     assert!(map.remove("bar").is_some());
165     assert_ready_none!(map.poll_next());
166 
167     assert!(map.is_empty());
168     assert_eq!(0, map.len());
169 }
170 
171 #[tokio::test]
replace()172 async fn replace() {
173     let mut map = task::spawn(StreamMap::new());
174     let (tx1, rx1) = mpsc::unbounded_channel_stream();
175     let (tx2, rx2) = mpsc::unbounded_channel_stream();
176 
177     let rx1 = Box::pin(rx1);
178     let rx2 = Box::pin(rx2);
179 
180     assert!(map.insert("foo", rx1).is_none());
181 
182     assert_pending!(map.poll_next());
183 
184     let _rx1 = map.insert("foo", rx2).unwrap();
185 
186     assert_pending!(map.poll_next());
187 
188     tx1.send(1).unwrap();
189     assert_pending!(map.poll_next());
190 
191     tx2.send(2).unwrap();
192     assert!(map.is_woken());
193     let v = assert_ready_some!(map.poll_next());
194     assert_eq!(v.0, "foo");
195     assert_eq!(v.1, 2);
196 }
197 
198 #[test]
size_hint_with_upper()199 fn size_hint_with_upper() {
200     let mut map = StreamMap::new();
201 
202     map.insert("a", stream::iter(vec![1]));
203     map.insert("b", stream::iter(vec![1, 2]));
204     map.insert("c", stream::iter(vec![1, 2, 3]));
205 
206     assert_eq!(3, map.len());
207     assert!(!map.is_empty());
208 
209     let size_hint = map.size_hint();
210     assert_eq!(size_hint, (6, Some(6)));
211 }
212 
213 #[test]
size_hint_without_upper()214 fn size_hint_without_upper() {
215     let mut map = StreamMap::new();
216 
217     map.insert("a", pin_box(stream::iter(vec![1])));
218     map.insert("b", pin_box(stream::iter(vec![1, 2])));
219     map.insert("c", pin_box(pending()));
220 
221     let size_hint = map.size_hint();
222     assert_eq!(size_hint, (3, None));
223 }
224 
225 #[test]
new_capacity_zero()226 fn new_capacity_zero() {
227     let map = StreamMap::<&str, stream::Pending<()>>::new();
228     assert_eq!(0, map.capacity());
229 
230     assert!(map.keys().next().is_none());
231 }
232 
233 #[test]
with_capacity()234 fn with_capacity() {
235     let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
236     assert!(10 <= map.capacity());
237 
238     assert!(map.keys().next().is_none());
239 }
240 
241 #[test]
iter_keys()242 fn iter_keys() {
243     let mut map = StreamMap::new();
244 
245     map.insert("a", pending::<i32>());
246     map.insert("b", pending());
247     map.insert("c", pending());
248 
249     let mut keys = map.keys().collect::<Vec<_>>();
250     keys.sort_unstable();
251 
252     assert_eq!(&keys[..], &[&"a", &"b", &"c"]);
253 }
254 
255 #[test]
iter_values()256 fn iter_values() {
257     let mut map = StreamMap::new();
258 
259     map.insert("a", stream::iter(vec![1]));
260     map.insert("b", stream::iter(vec![1, 2]));
261     map.insert("c", stream::iter(vec![1, 2, 3]));
262 
263     let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>();
264 
265     size_hints.sort_unstable();
266 
267     assert_eq!(&size_hints[..], &[1, 2, 3]);
268 }
269 
270 #[test]
iter_values_mut()271 fn iter_values_mut() {
272     let mut map = StreamMap::new();
273 
274     map.insert("a", stream::iter(vec![1]));
275     map.insert("b", stream::iter(vec![1, 2]));
276     map.insert("c", stream::iter(vec![1, 2, 3]));
277 
278     let mut size_hints = map
279         .values_mut()
280         .map(|s: &mut _| s.size_hint().0)
281         .collect::<Vec<_>>();
282 
283     size_hints.sort_unstable();
284 
285     assert_eq!(&size_hints[..], &[1, 2, 3]);
286 }
287 
288 #[test]
clear()289 fn clear() {
290     let mut map = task::spawn(StreamMap::new());
291 
292     map.insert("a", stream::iter(vec![1]));
293     map.insert("b", stream::iter(vec![1, 2]));
294     map.insert("c", stream::iter(vec![1, 2, 3]));
295 
296     assert_ready_some!(map.poll_next());
297 
298     map.clear();
299 
300     assert_ready_none!(map.poll_next());
301     assert!(map.is_empty());
302 }
303 
304 #[test]
contains_key_borrow()305 fn contains_key_borrow() {
306     let mut map = StreamMap::new();
307     map.insert("foo".to_string(), pending::<()>());
308 
309     assert!(map.contains_key("foo"));
310 }
311 
312 #[test]
one_ready_many_none()313 fn one_ready_many_none() {
314     // Run a few times because of randomness
315     for _ in 0..100 {
316         let mut map = task::spawn(StreamMap::new());
317 
318         map.insert(0, pin_box(stream::empty()));
319         map.insert(1, pin_box(stream::empty()));
320         map.insert(2, pin_box(stream::once("hello")));
321         map.insert(3, pin_box(stream::pending()));
322 
323         let v = assert_ready_some!(map.poll_next());
324         assert_eq!(v, (2, "hello"));
325     }
326 }
327 
328 proptest::proptest! {
329     #[test]
330     fn fuzz_pending_complete_mix(kinds: Vec<bool>) {
331         use std::task::{Context, Poll};
332 
333         struct DidPoll<T> {
334             did_poll: bool,
335             inner: T,
336         }
337 
338         impl<T: Stream + Unpin> Stream for DidPoll<T> {
339             type Item = T::Item;
340 
341             fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
342                 -> Poll<Option<T::Item>>
343             {
344                 self.did_poll = true;
345                 Pin::new(&mut self.inner).poll_next(cx)
346             }
347         }
348 
349         for _ in 0..10 {
350             let mut map = task::spawn(StreamMap::new());
351             let mut expect = 0;
352 
353             for (i, &is_empty) in kinds.iter().enumerate() {
354                 let inner = if is_empty {
355                     pin_box(stream::empty::<()>())
356                 } else {
357                     expect += 1;
358                     pin_box(stream::pending::<()>())
359                 };
360 
361                 let stream = DidPoll {
362                     did_poll: false,
363                     inner,
364                 };
365 
366                 map.insert(i, stream);
367             }
368 
369             if expect == 0 {
370                 assert_ready_none!(map.poll_next());
371             } else {
372                 assert_pending!(map.poll_next());
373 
374                 assert_eq!(expect, map.values().count());
375 
376                 for stream in map.values() {
377                     assert!(stream.did_poll);
378                 }
379             }
380         }
381     }
382 }
383 
pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>>384 fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
385     Box::pin(s)
386 }
387