• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::Stream;
2 
3 use std::borrow::Borrow;
4 use std::hash::Hash;
5 use std::pin::Pin;
6 use std::task::{Context, Poll};
7 
8 /// Combine many streams into one, indexing each source stream with a unique
9 /// key.
10 ///
11 /// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
12 /// streams into a single merged stream that yields values in the order that
13 /// they arrive from the source streams. However, `StreamMap` has a lot more
14 /// flexibility in usage patterns.
15 ///
16 /// `StreamMap` can:
17 ///
18 /// * Merge an arbitrary number of streams.
19 /// * Track which source stream the value was received from.
20 /// * Handle inserting and removing streams from the set of managed streams at
21 ///   any point during iteration.
22 ///
23 /// All source streams held by `StreamMap` are indexed using a key. This key is
24 /// included with the value when a source stream yields a value. The key is also
25 /// used to remove the stream from the `StreamMap` before the stream has
26 /// completed streaming.
27 ///
28 /// # `Unpin`
29 ///
30 /// Because the `StreamMap` API moves streams during runtime, both streams and
31 /// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
32 /// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
33 /// pin the stream in the heap.
34 ///
35 /// # Implementation
36 ///
37 /// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this
38 /// internal implementation detail will persist in future versions, but it is
39 /// important to know the runtime implications. In general, `StreamMap` works
40 /// best with a "smallish" number of streams as all entries are scanned on
41 /// insert, remove, and polling. In cases where a large number of streams need
42 /// to be merged, it may be advisable to use tasks sending values on a shared
43 /// [`mpsc`] channel.
44 ///
45 /// [`StreamExt::merge`]: crate::StreamExt::merge
46 /// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html
47 /// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html
48 /// [`Box::pin`]: std::boxed::Box::pin
49 ///
50 /// # Examples
51 ///
52 /// Merging two streams, then remove them after receiving the first value
53 ///
54 /// ```
55 /// use tokio_stream::{StreamExt, StreamMap, Stream};
56 /// use tokio::sync::mpsc;
57 /// use std::pin::Pin;
58 ///
59 /// #[tokio::main]
60 /// async fn main() {
61 ///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
62 ///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
63 ///
64 ///     // Convert the channels to a `Stream`.
65 ///     let rx1 = Box::pin(async_stream::stream! {
66 ///           while let Some(item) = rx1.recv().await {
67 ///               yield item;
68 ///           }
69 ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
70 ///
71 ///     let rx2 = Box::pin(async_stream::stream! {
72 ///           while let Some(item) = rx2.recv().await {
73 ///               yield item;
74 ///           }
75 ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
76 ///
77 ///     tokio::spawn(async move {
78 ///         tx1.send(1).await.unwrap();
79 ///
80 ///         // This value will never be received. The send may or may not return
81 ///         // `Err` depending on if the remote end closed first or not.
82 ///         let _ = tx1.send(2).await;
83 ///     });
84 ///
85 ///     tokio::spawn(async move {
86 ///         tx2.send(3).await.unwrap();
87 ///         let _ = tx2.send(4).await;
88 ///     });
89 ///
90 ///     let mut map = StreamMap::new();
91 ///
92 ///     // Insert both streams
93 ///     map.insert("one", rx1);
94 ///     map.insert("two", rx2);
95 ///
96 ///     // Read twice
97 ///     for _ in 0..2 {
98 ///         let (key, val) = map.next().await.unwrap();
99 ///
100 ///         if key == "one" {
101 ///             assert_eq!(val, 1);
102 ///         } else {
103 ///             assert_eq!(val, 3);
104 ///         }
105 ///
106 ///         // Remove the stream to prevent reading the next value
107 ///         map.remove(key);
108 ///     }
109 /// }
110 /// ```
111 ///
112 /// This example models a read-only client to a chat system with channels. The
113 /// client sends commands to join and leave channels. `StreamMap` is used to
114 /// manage active channel subscriptions.
115 ///
116 /// For simplicity, messages are displayed with `println!`, but they could be
117 /// sent to the client over a socket.
118 ///
119 /// ```no_run
120 /// use tokio_stream::{Stream, StreamExt, StreamMap};
121 ///
122 /// enum Command {
123 ///     Join(String),
124 ///     Leave(String),
125 /// }
126 ///
127 /// fn commands() -> impl Stream<Item = Command> {
128 ///     // Streams in user commands by parsing `stdin`.
129 /// # tokio_stream::pending()
130 /// }
131 ///
132 /// // Join a channel, returns a stream of messages received on the channel.
133 /// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
134 ///     // left as an exercise to the reader
135 /// # tokio_stream::pending()
136 /// }
137 ///
138 /// #[tokio::main]
139 /// async fn main() {
140 ///     let mut channels = StreamMap::new();
141 ///
142 ///     // Input commands (join / leave channels).
143 ///     let cmds = commands();
144 ///     tokio::pin!(cmds);
145 ///
146 ///     loop {
147 ///         tokio::select! {
148 ///             Some(cmd) = cmds.next() => {
149 ///                 match cmd {
150 ///                     Command::Join(chan) => {
151 ///                         // Join the channel and add it to the `channels`
152 ///                         // stream map
153 ///                         let msgs = join(&chan);
154 ///                         channels.insert(chan, msgs);
155 ///                     }
156 ///                     Command::Leave(chan) => {
157 ///                         channels.remove(&chan);
158 ///                     }
159 ///                 }
160 ///             }
161 ///             Some((chan, msg)) = channels.next() => {
162 ///                 // Received a message, display it on stdout with the channel
163 ///                 // it originated from.
164 ///                 println!("{}: {}", chan, msg);
165 ///             }
166 ///             // Both the `commands` stream and the `channels` stream are
167 ///             // complete. There is no more work to do, so leave the loop.
168 ///             else => break,
169 ///         }
170 ///     }
171 /// }
172 /// ```
173 #[derive(Debug)]
174 pub struct StreamMap<K, V> {
175     /// Streams stored in the map
176     entries: Vec<(K, V)>,
177 }
178 
179 impl<K, V> StreamMap<K, V> {
180     /// An iterator visiting all key-value pairs in arbitrary order.
181     ///
182     /// The iterator element type is &'a (K, V).
183     ///
184     /// # Examples
185     ///
186     /// ```
187     /// use tokio_stream::{StreamMap, pending};
188     ///
189     /// let mut map = StreamMap::new();
190     ///
191     /// map.insert("a", pending::<i32>());
192     /// map.insert("b", pending());
193     /// map.insert("c", pending());
194     ///
195     /// for (key, stream) in map.iter() {
196     ///     println!("({}, {:?})", key, stream);
197     /// }
198     /// ```
iter(&self) -> impl Iterator<Item = &(K, V)>199     pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
200         self.entries.iter()
201     }
202 
203     /// An iterator visiting all key-value pairs mutably in arbitrary order.
204     ///
205     /// The iterator element type is &'a mut (K, V).
206     ///
207     /// # Examples
208     ///
209     /// ```
210     /// use tokio_stream::{StreamMap, pending};
211     ///
212     /// let mut map = StreamMap::new();
213     ///
214     /// map.insert("a", pending::<i32>());
215     /// map.insert("b", pending());
216     /// map.insert("c", pending());
217     ///
218     /// for (key, stream) in map.iter_mut() {
219     ///     println!("({}, {:?})", key, stream);
220     /// }
221     /// ```
iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)>222     pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> {
223         self.entries.iter_mut()
224     }
225 
226     /// Creates an empty `StreamMap`.
227     ///
228     /// The stream map is initially created with a capacity of `0`, so it will
229     /// not allocate until it is first inserted into.
230     ///
231     /// # Examples
232     ///
233     /// ```
234     /// use tokio_stream::{StreamMap, Pending};
235     ///
236     /// let map: StreamMap<&str, Pending<()>> = StreamMap::new();
237     /// ```
new() -> StreamMap<K, V>238     pub fn new() -> StreamMap<K, V> {
239         StreamMap { entries: vec![] }
240     }
241 
242     /// Creates an empty `StreamMap` with the specified capacity.
243     ///
244     /// The stream map will be able to hold at least `capacity` elements without
245     /// reallocating. If `capacity` is 0, the stream map will not allocate.
246     ///
247     /// # Examples
248     ///
249     /// ```
250     /// use tokio_stream::{StreamMap, Pending};
251     ///
252     /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10);
253     /// ```
with_capacity(capacity: usize) -> StreamMap<K, V>254     pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
255         StreamMap {
256             entries: Vec::with_capacity(capacity),
257         }
258     }
259 
260     /// Returns an iterator visiting all keys in arbitrary order.
261     ///
262     /// The iterator element type is &'a K.
263     ///
264     /// # Examples
265     ///
266     /// ```
267     /// use tokio_stream::{StreamMap, pending};
268     ///
269     /// let mut map = StreamMap::new();
270     ///
271     /// map.insert("a", pending::<i32>());
272     /// map.insert("b", pending());
273     /// map.insert("c", pending());
274     ///
275     /// for key in map.keys() {
276     ///     println!("{}", key);
277     /// }
278     /// ```
keys(&self) -> impl Iterator<Item = &K>279     pub fn keys(&self) -> impl Iterator<Item = &K> {
280         self.iter().map(|(k, _)| k)
281     }
282 
283     /// An iterator visiting all values in arbitrary order.
284     ///
285     /// The iterator element type is &'a V.
286     ///
287     /// # Examples
288     ///
289     /// ```
290     /// use tokio_stream::{StreamMap, pending};
291     ///
292     /// let mut map = StreamMap::new();
293     ///
294     /// map.insert("a", pending::<i32>());
295     /// map.insert("b", pending());
296     /// map.insert("c", pending());
297     ///
298     /// for stream in map.values() {
299     ///     println!("{:?}", stream);
300     /// }
301     /// ```
values(&self) -> impl Iterator<Item = &V>302     pub fn values(&self) -> impl Iterator<Item = &V> {
303         self.iter().map(|(_, v)| v)
304     }
305 
306     /// An iterator visiting all values mutably in arbitrary order.
307     ///
308     /// The iterator element type is &'a mut V.
309     ///
310     /// # Examples
311     ///
312     /// ```
313     /// use tokio_stream::{StreamMap, pending};
314     ///
315     /// let mut map = StreamMap::new();
316     ///
317     /// map.insert("a", pending::<i32>());
318     /// map.insert("b", pending());
319     /// map.insert("c", pending());
320     ///
321     /// for stream in map.values_mut() {
322     ///     println!("{:?}", stream);
323     /// }
324     /// ```
values_mut(&mut self) -> impl Iterator<Item = &mut V>325     pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
326         self.iter_mut().map(|(_, v)| v)
327     }
328 
329     /// Returns the number of streams the map can hold without reallocating.
330     ///
331     /// This number is a lower bound; the `StreamMap` might be able to hold
332     /// more, but is guaranteed to be able to hold at least this many.
333     ///
334     /// # Examples
335     ///
336     /// ```
337     /// use tokio_stream::{StreamMap, Pending};
338     ///
339     /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100);
340     /// assert!(map.capacity() >= 100);
341     /// ```
capacity(&self) -> usize342     pub fn capacity(&self) -> usize {
343         self.entries.capacity()
344     }
345 
346     /// Returns the number of streams in the map.
347     ///
348     /// # Examples
349     ///
350     /// ```
351     /// use tokio_stream::{StreamMap, pending};
352     ///
353     /// let mut a = StreamMap::new();
354     /// assert_eq!(a.len(), 0);
355     /// a.insert(1, pending::<i32>());
356     /// assert_eq!(a.len(), 1);
357     /// ```
len(&self) -> usize358     pub fn len(&self) -> usize {
359         self.entries.len()
360     }
361 
362     /// Returns `true` if the map contains no elements.
363     ///
364     /// # Examples
365     ///
366     /// ```
367     /// use tokio_stream::{StreamMap, pending};
368     ///
369     /// let mut a = StreamMap::new();
370     /// assert!(a.is_empty());
371     /// a.insert(1, pending::<i32>());
372     /// assert!(!a.is_empty());
373     /// ```
is_empty(&self) -> bool374     pub fn is_empty(&self) -> bool {
375         self.entries.is_empty()
376     }
377 
378     /// Clears the map, removing all key-stream pairs. Keeps the allocated
379     /// memory for reuse.
380     ///
381     /// # Examples
382     ///
383     /// ```
384     /// use tokio_stream::{StreamMap, pending};
385     ///
386     /// let mut a = StreamMap::new();
387     /// a.insert(1, pending::<i32>());
388     /// a.clear();
389     /// assert!(a.is_empty());
390     /// ```
clear(&mut self)391     pub fn clear(&mut self) {
392         self.entries.clear();
393     }
394 
395     /// Insert a key-stream pair into the map.
396     ///
397     /// If the map did not have this key present, `None` is returned.
398     ///
399     /// If the map did have this key present, the new `stream` replaces the old
400     /// one and the old stream is returned.
401     ///
402     /// # Examples
403     ///
404     /// ```
405     /// use tokio_stream::{StreamMap, pending};
406     ///
407     /// let mut map = StreamMap::new();
408     ///
409     /// assert!(map.insert(37, pending::<i32>()).is_none());
410     /// assert!(!map.is_empty());
411     ///
412     /// map.insert(37, pending());
413     /// assert!(map.insert(37, pending()).is_some());
414     /// ```
insert(&mut self, k: K, stream: V) -> Option<V> where K: Hash + Eq,415     pub fn insert(&mut self, k: K, stream: V) -> Option<V>
416     where
417         K: Hash + Eq,
418     {
419         let ret = self.remove(&k);
420         self.entries.push((k, stream));
421 
422         ret
423     }
424 
425     /// Removes a key from the map, returning the stream at the key if the key was previously in the map.
426     ///
427     /// The key may be any borrowed form of the map's key type, but `Hash` and
428     /// `Eq` on the borrowed form must match those for the key type.
429     ///
430     /// # Examples
431     ///
432     /// ```
433     /// use tokio_stream::{StreamMap, pending};
434     ///
435     /// let mut map = StreamMap::new();
436     /// map.insert(1, pending::<i32>());
437     /// assert!(map.remove(&1).is_some());
438     /// assert!(map.remove(&1).is_none());
439     /// ```
remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> where K: Borrow<Q>, Q: Hash + Eq,440     pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V>
441     where
442         K: Borrow<Q>,
443         Q: Hash + Eq,
444     {
445         for i in 0..self.entries.len() {
446             if self.entries[i].0.borrow() == k {
447                 return Some(self.entries.swap_remove(i).1);
448             }
449         }
450 
451         None
452     }
453 
454     /// Returns `true` if the map contains a stream for the specified key.
455     ///
456     /// The key may be any borrowed form of the map's key type, but `Hash` and
457     /// `Eq` on the borrowed form must match those for the key type.
458     ///
459     /// # Examples
460     ///
461     /// ```
462     /// use tokio_stream::{StreamMap, pending};
463     ///
464     /// let mut map = StreamMap::new();
465     /// map.insert(1, pending::<i32>());
466     /// assert_eq!(map.contains_key(&1), true);
467     /// assert_eq!(map.contains_key(&2), false);
468     /// ```
contains_key<Q: ?Sized>(&self, k: &Q) -> bool where K: Borrow<Q>, Q: Hash + Eq,469     pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool
470     where
471         K: Borrow<Q>,
472         Q: Hash + Eq,
473     {
474         for i in 0..self.entries.len() {
475             if self.entries[i].0.borrow() == k {
476                 return true;
477             }
478         }
479 
480         false
481     }
482 }
483 
484 impl<K, V> StreamMap<K, V>
485 where
486     K: Unpin,
487     V: Stream + Unpin,
488 {
489     /// Polls the next value, includes the vec entry index
poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>>490     fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
491         use Poll::*;
492 
493         let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
494         let mut idx = start;
495 
496         for _ in 0..self.entries.len() {
497             let (_, stream) = &mut self.entries[idx];
498 
499             match Pin::new(stream).poll_next(cx) {
500                 Ready(Some(val)) => return Ready(Some((idx, val))),
501                 Ready(None) => {
502                     // Remove the entry
503                     self.entries.swap_remove(idx);
504 
505                     // Check if this was the last entry, if so the cursor needs
506                     // to wrap
507                     if idx == self.entries.len() {
508                         idx = 0;
509                     } else if idx < start && start <= self.entries.len() {
510                         // The stream being swapped into the current index has
511                         // already been polled, so skip it.
512                         idx = idx.wrapping_add(1) % self.entries.len();
513                     }
514                 }
515                 Pending => {
516                     idx = idx.wrapping_add(1) % self.entries.len();
517                 }
518             }
519         }
520 
521         // If the map is empty, then the stream is complete.
522         if self.entries.is_empty() {
523             Ready(None)
524         } else {
525             Pending
526         }
527     }
528 }
529 
530 impl<K, V> Default for StreamMap<K, V> {
default() -> Self531     fn default() -> Self {
532         Self::new()
533     }
534 }
535 
536 impl<K, V> Stream for StreamMap<K, V>
537 where
538     K: Clone + Unpin,
539     V: Stream + Unpin,
540 {
541     type Item = (K, V::Item);
542 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>543     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
544         if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
545             let key = self.entries[idx].0.clone();
546             Poll::Ready(Some((key, val)))
547         } else {
548             Poll::Ready(None)
549         }
550     }
551 
size_hint(&self) -> (usize, Option<usize>)552     fn size_hint(&self) -> (usize, Option<usize>) {
553         let mut ret = (0, Some(0));
554 
555         for (_, stream) in &self.entries {
556             let hint = stream.size_hint();
557 
558             ret.0 += hint.0;
559 
560             match (ret.1, hint.1) {
561                 (Some(a), Some(b)) => ret.1 = Some(a + b),
562                 (Some(_), None) => ret.1 = None,
563                 _ => {}
564             }
565         }
566 
567         ret
568     }
569 }
570 
571 impl<K, V> std::iter::FromIterator<(K, V)> for StreamMap<K, V>
572 where
573     K: Hash + Eq,
574 {
from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self575     fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
576         let iterator = iter.into_iter();
577         let (lower_bound, _) = iterator.size_hint();
578         let mut stream_map = Self::with_capacity(lower_bound);
579 
580         for (key, value) in iterator {
581             stream_map.insert(key, value);
582         }
583 
584         stream_map
585     }
586 }
587 
588 mod rand {
589     use std::cell::Cell;
590 
591     mod loom {
592         #[cfg(not(loom))]
593         pub(crate) mod rand {
594             use std::collections::hash_map::RandomState;
595             use std::hash::{BuildHasher, Hash, Hasher};
596             use std::sync::atomic::AtomicU32;
597             use std::sync::atomic::Ordering::Relaxed;
598 
599             static COUNTER: AtomicU32 = AtomicU32::new(1);
600 
seed() -> u64601             pub(crate) fn seed() -> u64 {
602                 let rand_state = RandomState::new();
603 
604                 let mut hasher = rand_state.build_hasher();
605 
606                 // Hash some unique-ish data to generate some new state
607                 COUNTER.fetch_add(1, Relaxed).hash(&mut hasher);
608 
609                 // Get the seed
610                 hasher.finish()
611             }
612         }
613 
614         #[cfg(loom)]
615         pub(crate) mod rand {
seed() -> u64616             pub(crate) fn seed() -> u64 {
617                 1
618             }
619         }
620     }
621 
622     /// Fast random number generate
623     ///
624     /// Implement xorshift64+: 2 32-bit xorshift sequences added together.
625     /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's
626     /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf>
627     /// This generator passes the SmallCrush suite, part of TestU01 framework:
628     /// <http://simul.iro.umontreal.ca/testu01/tu01.html>
629     #[derive(Debug)]
630     pub(crate) struct FastRand {
631         one: Cell<u32>,
632         two: Cell<u32>,
633     }
634 
635     impl FastRand {
636         /// Initialize a new, thread-local, fast random number generator.
new(seed: u64) -> FastRand637         pub(crate) fn new(seed: u64) -> FastRand {
638             let one = (seed >> 32) as u32;
639             let mut two = seed as u32;
640 
641             if two == 0 {
642                 // This value cannot be zero
643                 two = 1;
644             }
645 
646             FastRand {
647                 one: Cell::new(one),
648                 two: Cell::new(two),
649             }
650         }
651 
fastrand_n(&self, n: u32) -> u32652         pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
653             // This is similar to fastrand() % n, but faster.
654             // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
655             let mul = (self.fastrand() as u64).wrapping_mul(n as u64);
656             (mul >> 32) as u32
657         }
658 
fastrand(&self) -> u32659         fn fastrand(&self) -> u32 {
660             let mut s1 = self.one.get();
661             let s0 = self.two.get();
662 
663             s1 ^= s1 << 17;
664             s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16;
665 
666             self.one.set(s0);
667             self.two.set(s1);
668 
669             s0.wrapping_add(s1)
670         }
671     }
672 
673     // Used by `StreamMap`
thread_rng_n(n: u32) -> u32674     pub(crate) fn thread_rng_n(n: u32) -> u32 {
675         thread_local! {
676             static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed());
677         }
678 
679         THREAD_RNG.with(|rng| rng.fastrand_n(n))
680     }
681 }
682