1 use tokio_stream::{self as stream, pending, Stream, StreamExt, StreamMap};
2 use tokio_test::{assert_ok, assert_pending, assert_ready, task};
3
4 mod support {
5 pub(crate) mod mpsc;
6 }
7
8 use support::mpsc;
9
10 use std::pin::Pin;
11
12 macro_rules! assert_ready_some {
13 ($($t:tt)*) => {
14 match assert_ready!($($t)*) {
15 Some(v) => v,
16 None => panic!("expected `Some`, got `None`"),
17 }
18 };
19 }
20
21 macro_rules! assert_ready_none {
22 ($($t:tt)*) => {
23 match assert_ready!($($t)*) {
24 None => {}
25 Some(v) => panic!("expected `None`, got `Some({:?})`", v),
26 }
27 };
28 }
29
30 #[tokio::test]
empty()31 async fn empty() {
32 let mut map = StreamMap::<&str, stream::Pending<()>>::new();
33
34 assert_eq!(map.len(), 0);
35 assert!(map.is_empty());
36
37 assert!(map.next().await.is_none());
38 assert!(map.next().await.is_none());
39
40 assert!(map.remove("foo").is_none());
41 }
42
43 #[tokio::test]
single_entry()44 async fn single_entry() {
45 let mut map = task::spawn(StreamMap::new());
46 let (tx, rx) = mpsc::unbounded_channel_stream();
47 let rx = Box::pin(rx);
48
49 assert_ready_none!(map.poll_next());
50
51 assert!(map.insert("foo", rx).is_none());
52 assert!(map.contains_key("foo"));
53 assert!(!map.contains_key("bar"));
54
55 assert_eq!(map.len(), 1);
56 assert!(!map.is_empty());
57
58 assert_pending!(map.poll_next());
59
60 assert_ok!(tx.send(1));
61
62 assert!(map.is_woken());
63 let (k, v) = assert_ready_some!(map.poll_next());
64 assert_eq!(k, "foo");
65 assert_eq!(v, 1);
66
67 assert_pending!(map.poll_next());
68
69 assert_ok!(tx.send(2));
70
71 assert!(map.is_woken());
72 let (k, v) = assert_ready_some!(map.poll_next());
73 assert_eq!(k, "foo");
74 assert_eq!(v, 2);
75
76 assert_pending!(map.poll_next());
77 drop(tx);
78 assert!(map.is_woken());
79 assert_ready_none!(map.poll_next());
80 }
81
82 #[tokio::test]
multiple_entries()83 async fn multiple_entries() {
84 let mut map = task::spawn(StreamMap::new());
85 let (tx1, rx1) = mpsc::unbounded_channel_stream();
86 let (tx2, rx2) = mpsc::unbounded_channel_stream();
87
88 let rx1 = Box::pin(rx1);
89 let rx2 = Box::pin(rx2);
90
91 map.insert("foo", rx1);
92 map.insert("bar", rx2);
93
94 assert_pending!(map.poll_next());
95
96 assert_ok!(tx1.send(1));
97
98 assert!(map.is_woken());
99 let (k, v) = assert_ready_some!(map.poll_next());
100 assert_eq!(k, "foo");
101 assert_eq!(v, 1);
102
103 assert_pending!(map.poll_next());
104
105 assert_ok!(tx2.send(2));
106
107 assert!(map.is_woken());
108 let (k, v) = assert_ready_some!(map.poll_next());
109 assert_eq!(k, "bar");
110 assert_eq!(v, 2);
111
112 assert_pending!(map.poll_next());
113
114 assert_ok!(tx1.send(3));
115 assert_ok!(tx2.send(4));
116
117 assert!(map.is_woken());
118
119 // Given the randomization, there is no guarantee what order the values will
120 // be received in.
121 let mut v = (0..2)
122 .map(|_| assert_ready_some!(map.poll_next()))
123 .collect::<Vec<_>>();
124
125 assert_pending!(map.poll_next());
126
127 v.sort_unstable();
128 assert_eq!(v[0].0, "bar");
129 assert_eq!(v[0].1, 4);
130 assert_eq!(v[1].0, "foo");
131 assert_eq!(v[1].1, 3);
132
133 drop(tx1);
134 assert!(map.is_woken());
135 assert_pending!(map.poll_next());
136 drop(tx2);
137
138 assert_ready_none!(map.poll_next());
139 }
140
141 #[tokio::test]
insert_remove()142 async fn insert_remove() {
143 let mut map = task::spawn(StreamMap::new());
144 let (tx, rx) = mpsc::unbounded_channel_stream();
145
146 let rx = Box::pin(rx);
147
148 assert_ready_none!(map.poll_next());
149
150 assert!(map.insert("foo", rx).is_none());
151 let rx = map.remove("foo").unwrap();
152
153 assert_ok!(tx.send(1));
154
155 assert!(!map.is_woken());
156 assert_ready_none!(map.poll_next());
157
158 assert!(map.insert("bar", rx).is_none());
159
160 let v = assert_ready_some!(map.poll_next());
161 assert_eq!(v.0, "bar");
162 assert_eq!(v.1, 1);
163
164 assert!(map.remove("bar").is_some());
165 assert_ready_none!(map.poll_next());
166
167 assert!(map.is_empty());
168 assert_eq!(0, map.len());
169 }
170
171 #[tokio::test]
replace()172 async fn replace() {
173 let mut map = task::spawn(StreamMap::new());
174 let (tx1, rx1) = mpsc::unbounded_channel_stream();
175 let (tx2, rx2) = mpsc::unbounded_channel_stream();
176
177 let rx1 = Box::pin(rx1);
178 let rx2 = Box::pin(rx2);
179
180 assert!(map.insert("foo", rx1).is_none());
181
182 assert_pending!(map.poll_next());
183
184 let _rx1 = map.insert("foo", rx2).unwrap();
185
186 assert_pending!(map.poll_next());
187
188 tx1.send(1).unwrap();
189 assert_pending!(map.poll_next());
190
191 tx2.send(2).unwrap();
192 assert!(map.is_woken());
193 let v = assert_ready_some!(map.poll_next());
194 assert_eq!(v.0, "foo");
195 assert_eq!(v.1, 2);
196 }
197
198 #[test]
size_hint_with_upper()199 fn size_hint_with_upper() {
200 let mut map = StreamMap::new();
201
202 map.insert("a", stream::iter(vec![1]));
203 map.insert("b", stream::iter(vec![1, 2]));
204 map.insert("c", stream::iter(vec![1, 2, 3]));
205
206 assert_eq!(3, map.len());
207 assert!(!map.is_empty());
208
209 let size_hint = map.size_hint();
210 assert_eq!(size_hint, (6, Some(6)));
211 }
212
213 #[test]
size_hint_without_upper()214 fn size_hint_without_upper() {
215 let mut map = StreamMap::new();
216
217 map.insert("a", pin_box(stream::iter(vec![1])));
218 map.insert("b", pin_box(stream::iter(vec![1, 2])));
219 map.insert("c", pin_box(pending()));
220
221 let size_hint = map.size_hint();
222 assert_eq!(size_hint, (3, None));
223 }
224
225 #[test]
new_capacity_zero()226 fn new_capacity_zero() {
227 let map = StreamMap::<&str, stream::Pending<()>>::new();
228 assert_eq!(0, map.capacity());
229
230 assert!(map.keys().next().is_none());
231 }
232
233 #[test]
with_capacity()234 fn with_capacity() {
235 let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
236 assert!(10 <= map.capacity());
237
238 assert!(map.keys().next().is_none());
239 }
240
241 #[test]
iter_keys()242 fn iter_keys() {
243 let mut map = StreamMap::new();
244
245 map.insert("a", pending::<i32>());
246 map.insert("b", pending());
247 map.insert("c", pending());
248
249 let mut keys = map.keys().collect::<Vec<_>>();
250 keys.sort_unstable();
251
252 assert_eq!(&keys[..], &[&"a", &"b", &"c"]);
253 }
254
255 #[test]
iter_values()256 fn iter_values() {
257 let mut map = StreamMap::new();
258
259 map.insert("a", stream::iter(vec![1]));
260 map.insert("b", stream::iter(vec![1, 2]));
261 map.insert("c", stream::iter(vec![1, 2, 3]));
262
263 let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>();
264
265 size_hints.sort_unstable();
266
267 assert_eq!(&size_hints[..], &[1, 2, 3]);
268 }
269
270 #[test]
iter_values_mut()271 fn iter_values_mut() {
272 let mut map = StreamMap::new();
273
274 map.insert("a", stream::iter(vec![1]));
275 map.insert("b", stream::iter(vec![1, 2]));
276 map.insert("c", stream::iter(vec![1, 2, 3]));
277
278 let mut size_hints = map
279 .values_mut()
280 .map(|s: &mut _| s.size_hint().0)
281 .collect::<Vec<_>>();
282
283 size_hints.sort_unstable();
284
285 assert_eq!(&size_hints[..], &[1, 2, 3]);
286 }
287
288 #[test]
clear()289 fn clear() {
290 let mut map = task::spawn(StreamMap::new());
291
292 map.insert("a", stream::iter(vec![1]));
293 map.insert("b", stream::iter(vec![1, 2]));
294 map.insert("c", stream::iter(vec![1, 2, 3]));
295
296 assert_ready_some!(map.poll_next());
297
298 map.clear();
299
300 assert_ready_none!(map.poll_next());
301 assert!(map.is_empty());
302 }
303
304 #[test]
contains_key_borrow()305 fn contains_key_borrow() {
306 let mut map = StreamMap::new();
307 map.insert("foo".to_string(), pending::<()>());
308
309 assert!(map.contains_key("foo"));
310 }
311
312 #[test]
one_ready_many_none()313 fn one_ready_many_none() {
314 // Run a few times because of randomness
315 for _ in 0..100 {
316 let mut map = task::spawn(StreamMap::new());
317
318 map.insert(0, pin_box(stream::empty()));
319 map.insert(1, pin_box(stream::empty()));
320 map.insert(2, pin_box(stream::once("hello")));
321 map.insert(3, pin_box(stream::pending()));
322
323 let v = assert_ready_some!(map.poll_next());
324 assert_eq!(v, (2, "hello"));
325 }
326 }
327
328 proptest::proptest! {
329 #[test]
330 fn fuzz_pending_complete_mix(kinds: Vec<bool>) {
331 use std::task::{Context, Poll};
332
333 struct DidPoll<T> {
334 did_poll: bool,
335 inner: T,
336 }
337
338 impl<T: Stream + Unpin> Stream for DidPoll<T> {
339 type Item = T::Item;
340
341 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
342 -> Poll<Option<T::Item>>
343 {
344 self.did_poll = true;
345 Pin::new(&mut self.inner).poll_next(cx)
346 }
347 }
348
349 for _ in 0..10 {
350 let mut map = task::spawn(StreamMap::new());
351 let mut expect = 0;
352
353 for (i, &is_empty) in kinds.iter().enumerate() {
354 let inner = if is_empty {
355 pin_box(stream::empty::<()>())
356 } else {
357 expect += 1;
358 pin_box(stream::pending::<()>())
359 };
360
361 let stream = DidPoll {
362 did_poll: false,
363 inner,
364 };
365
366 map.insert(i, stream);
367 }
368
369 if expect == 0 {
370 assert_ready_none!(map.poll_next());
371 } else {
372 assert_pending!(map.poll_next());
373
374 assert_eq!(expect, map.values().count());
375
376 for stream in map.values() {
377 assert!(stream.did_poll);
378 }
379 }
380 }
381 }
382 }
383
pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>>384 fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
385 Box::pin(s)
386 }
387