• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::Stream;
2 
3 use std::borrow::Borrow;
4 use std::hash::Hash;
5 use std::pin::Pin;
6 use std::task::{Context, Poll};
7 
8 /// Combine many streams into one, indexing each source stream with a unique
9 /// key.
10 ///
11 /// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
12 /// streams into a single merged stream that yields values in the order that
13 /// they arrive from the source streams. However, `StreamMap` has a lot more
14 /// flexibility in usage patterns.
15 ///
16 /// `StreamMap` can:
17 ///
18 /// * Merge an arbitrary number of streams.
19 /// * Track which source stream the value was received from.
20 /// * Handle inserting and removing streams from the set of managed streams at
21 ///   any point during iteration.
22 ///
23 /// All source streams held by `StreamMap` are indexed using a key. This key is
24 /// included with the value when a source stream yields a value. The key is also
25 /// used to remove the stream from the `StreamMap` before the stream has
26 /// completed streaming.
27 ///
28 /// # `Unpin`
29 ///
30 /// Because the `StreamMap` API moves streams during runtime, both streams and
31 /// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
32 /// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
33 /// pin the stream in the heap.
34 ///
35 /// # Implementation
36 ///
37 /// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this
38 /// internal implementation detail will persist in future versions, but it is
39 /// important to know the runtime implications. In general, `StreamMap` works
40 /// best with a "smallish" number of streams as all entries are scanned on
41 /// insert, remove, and polling. In cases where a large number of streams need
42 /// to be merged, it may be advisable to use tasks sending values on a shared
43 /// [`mpsc`] channel.
44 ///
45 /// # Notes
46 ///
47 /// `StreamMap` removes finished streams automatically, without alerting the user.
48 /// In some scenarios, the caller would want to know on closed streams.
49 /// To do this, use [`StreamNotifyClose`] as a wrapper to your stream.
50 /// It will return None when the stream is closed.
51 ///
52 /// [`StreamExt::merge`]: crate::StreamExt::merge
53 /// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html
54 /// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html
55 /// [`Box::pin`]: std::boxed::Box::pin
56 /// [`StreamNotifyClose`]: crate::StreamNotifyClose
57 ///
58 /// # Examples
59 ///
60 /// Merging two streams, then remove them after receiving the first value
61 ///
62 /// ```
63 /// use tokio_stream::{StreamExt, StreamMap, Stream};
64 /// use tokio::sync::mpsc;
65 /// use std::pin::Pin;
66 ///
67 /// #[tokio::main]
68 /// async fn main() {
69 ///     let (tx1, mut rx1) = mpsc::channel::<usize>(10);
70 ///     let (tx2, mut rx2) = mpsc::channel::<usize>(10);
71 ///
72 ///     // Convert the channels to a `Stream`.
73 ///     let rx1 = Box::pin(async_stream::stream! {
74 ///           while let Some(item) = rx1.recv().await {
75 ///               yield item;
76 ///           }
77 ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
78 ///
79 ///     let rx2 = Box::pin(async_stream::stream! {
80 ///           while let Some(item) = rx2.recv().await {
81 ///               yield item;
82 ///           }
83 ///     }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
84 ///
85 ///     tokio::spawn(async move {
86 ///         tx1.send(1).await.unwrap();
87 ///
88 ///         // This value will never be received. The send may or may not return
89 ///         // `Err` depending on if the remote end closed first or not.
90 ///         let _ = tx1.send(2).await;
91 ///     });
92 ///
93 ///     tokio::spawn(async move {
94 ///         tx2.send(3).await.unwrap();
95 ///         let _ = tx2.send(4).await;
96 ///     });
97 ///
98 ///     let mut map = StreamMap::new();
99 ///
100 ///     // Insert both streams
101 ///     map.insert("one", rx1);
102 ///     map.insert("two", rx2);
103 ///
104 ///     // Read twice
105 ///     for _ in 0..2 {
106 ///         let (key, val) = map.next().await.unwrap();
107 ///
108 ///         if key == "one" {
109 ///             assert_eq!(val, 1);
110 ///         } else {
111 ///             assert_eq!(val, 3);
112 ///         }
113 ///
114 ///         // Remove the stream to prevent reading the next value
115 ///         map.remove(key);
116 ///     }
117 /// }
118 /// ```
119 ///
120 /// This example models a read-only client to a chat system with channels. The
121 /// client sends commands to join and leave channels. `StreamMap` is used to
122 /// manage active channel subscriptions.
123 ///
124 /// For simplicity, messages are displayed with `println!`, but they could be
125 /// sent to the client over a socket.
126 ///
127 /// ```no_run
128 /// use tokio_stream::{Stream, StreamExt, StreamMap};
129 ///
130 /// enum Command {
131 ///     Join(String),
132 ///     Leave(String),
133 /// }
134 ///
135 /// fn commands() -> impl Stream<Item = Command> {
136 ///     // Streams in user commands by parsing `stdin`.
137 /// # tokio_stream::pending()
138 /// }
139 ///
140 /// // Join a channel, returns a stream of messages received on the channel.
141 /// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
142 ///     // left as an exercise to the reader
143 /// # tokio_stream::pending()
144 /// }
145 ///
146 /// #[tokio::main]
147 /// async fn main() {
148 ///     let mut channels = StreamMap::new();
149 ///
150 ///     // Input commands (join / leave channels).
151 ///     let cmds = commands();
152 ///     tokio::pin!(cmds);
153 ///
154 ///     loop {
155 ///         tokio::select! {
156 ///             Some(cmd) = cmds.next() => {
157 ///                 match cmd {
158 ///                     Command::Join(chan) => {
159 ///                         // Join the channel and add it to the `channels`
160 ///                         // stream map
161 ///                         let msgs = join(&chan);
162 ///                         channels.insert(chan, msgs);
163 ///                     }
164 ///                     Command::Leave(chan) => {
165 ///                         channels.remove(&chan);
166 ///                     }
167 ///                 }
168 ///             }
169 ///             Some((chan, msg)) = channels.next() => {
170 ///                 // Received a message, display it on stdout with the channel
171 ///                 // it originated from.
172 ///                 println!("{}: {}", chan, msg);
173 ///             }
174 ///             // Both the `commands` stream and the `channels` stream are
175 ///             // complete. There is no more work to do, so leave the loop.
176 ///             else => break,
177 ///         }
178 ///     }
179 /// }
180 /// ```
181 ///
182 /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`.
183 ///
184 /// ```
185 /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose};
186 ///
187 /// #[tokio::main]
188 /// async fn main() {
189 ///     let mut map = StreamMap::new();
190 ///     let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
191 ///     let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
192 ///     map.insert(0, stream);
193 ///     map.insert(1, stream2);
194 ///     while let Some((key, val)) = map.next().await {
195 ///         match val {
196 ///             Some(val) => println!("got {val:?} from stream {key:?}"),
197 ///             None => println!("stream {key:?} closed"),
198 ///         }
199 ///     }
200 /// }
201 /// ```
202 
203 #[derive(Debug)]
204 pub struct StreamMap<K, V> {
205     /// Streams stored in the map
206     entries: Vec<(K, V)>,
207 }
208 
209 impl<K, V> StreamMap<K, V> {
210     /// An iterator visiting all key-value pairs in arbitrary order.
211     ///
212     /// The iterator element type is &'a (K, V).
213     ///
214     /// # Examples
215     ///
216     /// ```
217     /// use tokio_stream::{StreamMap, pending};
218     ///
219     /// let mut map = StreamMap::new();
220     ///
221     /// map.insert("a", pending::<i32>());
222     /// map.insert("b", pending());
223     /// map.insert("c", pending());
224     ///
225     /// for (key, stream) in map.iter() {
226     ///     println!("({}, {:?})", key, stream);
227     /// }
228     /// ```
iter(&self) -> impl Iterator<Item = &(K, V)>229     pub fn iter(&self) -> impl Iterator<Item = &(K, V)> {
230         self.entries.iter()
231     }
232 
233     /// An iterator visiting all key-value pairs mutably in arbitrary order.
234     ///
235     /// The iterator element type is &'a mut (K, V).
236     ///
237     /// # Examples
238     ///
239     /// ```
240     /// use tokio_stream::{StreamMap, pending};
241     ///
242     /// let mut map = StreamMap::new();
243     ///
244     /// map.insert("a", pending::<i32>());
245     /// map.insert("b", pending());
246     /// map.insert("c", pending());
247     ///
248     /// for (key, stream) in map.iter_mut() {
249     ///     println!("({}, {:?})", key, stream);
250     /// }
251     /// ```
iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)>252     pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> {
253         self.entries.iter_mut()
254     }
255 
256     /// Creates an empty `StreamMap`.
257     ///
258     /// The stream map is initially created with a capacity of `0`, so it will
259     /// not allocate until it is first inserted into.
260     ///
261     /// # Examples
262     ///
263     /// ```
264     /// use tokio_stream::{StreamMap, Pending};
265     ///
266     /// let map: StreamMap<&str, Pending<()>> = StreamMap::new();
267     /// ```
new() -> StreamMap<K, V>268     pub fn new() -> StreamMap<K, V> {
269         StreamMap { entries: vec![] }
270     }
271 
272     /// Creates an empty `StreamMap` with the specified capacity.
273     ///
274     /// The stream map will be able to hold at least `capacity` elements without
275     /// reallocating. If `capacity` is 0, the stream map will not allocate.
276     ///
277     /// # Examples
278     ///
279     /// ```
280     /// use tokio_stream::{StreamMap, Pending};
281     ///
282     /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10);
283     /// ```
with_capacity(capacity: usize) -> StreamMap<K, V>284     pub fn with_capacity(capacity: usize) -> StreamMap<K, V> {
285         StreamMap {
286             entries: Vec::with_capacity(capacity),
287         }
288     }
289 
290     /// Returns an iterator visiting all keys in arbitrary order.
291     ///
292     /// The iterator element type is &'a K.
293     ///
294     /// # Examples
295     ///
296     /// ```
297     /// use tokio_stream::{StreamMap, pending};
298     ///
299     /// let mut map = StreamMap::new();
300     ///
301     /// map.insert("a", pending::<i32>());
302     /// map.insert("b", pending());
303     /// map.insert("c", pending());
304     ///
305     /// for key in map.keys() {
306     ///     println!("{}", key);
307     /// }
308     /// ```
keys(&self) -> impl Iterator<Item = &K>309     pub fn keys(&self) -> impl Iterator<Item = &K> {
310         self.iter().map(|(k, _)| k)
311     }
312 
313     /// An iterator visiting all values in arbitrary order.
314     ///
315     /// The iterator element type is &'a V.
316     ///
317     /// # Examples
318     ///
319     /// ```
320     /// use tokio_stream::{StreamMap, pending};
321     ///
322     /// let mut map = StreamMap::new();
323     ///
324     /// map.insert("a", pending::<i32>());
325     /// map.insert("b", pending());
326     /// map.insert("c", pending());
327     ///
328     /// for stream in map.values() {
329     ///     println!("{:?}", stream);
330     /// }
331     /// ```
values(&self) -> impl Iterator<Item = &V>332     pub fn values(&self) -> impl Iterator<Item = &V> {
333         self.iter().map(|(_, v)| v)
334     }
335 
336     /// An iterator visiting all values mutably in arbitrary order.
337     ///
338     /// The iterator element type is &'a mut V.
339     ///
340     /// # Examples
341     ///
342     /// ```
343     /// use tokio_stream::{StreamMap, pending};
344     ///
345     /// let mut map = StreamMap::new();
346     ///
347     /// map.insert("a", pending::<i32>());
348     /// map.insert("b", pending());
349     /// map.insert("c", pending());
350     ///
351     /// for stream in map.values_mut() {
352     ///     println!("{:?}", stream);
353     /// }
354     /// ```
values_mut(&mut self) -> impl Iterator<Item = &mut V>355     pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> {
356         self.iter_mut().map(|(_, v)| v)
357     }
358 
359     /// Returns the number of streams the map can hold without reallocating.
360     ///
361     /// This number is a lower bound; the `StreamMap` might be able to hold
362     /// more, but is guaranteed to be able to hold at least this many.
363     ///
364     /// # Examples
365     ///
366     /// ```
367     /// use tokio_stream::{StreamMap, Pending};
368     ///
369     /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100);
370     /// assert!(map.capacity() >= 100);
371     /// ```
capacity(&self) -> usize372     pub fn capacity(&self) -> usize {
373         self.entries.capacity()
374     }
375 
376     /// Returns the number of streams in the map.
377     ///
378     /// # Examples
379     ///
380     /// ```
381     /// use tokio_stream::{StreamMap, pending};
382     ///
383     /// let mut a = StreamMap::new();
384     /// assert_eq!(a.len(), 0);
385     /// a.insert(1, pending::<i32>());
386     /// assert_eq!(a.len(), 1);
387     /// ```
len(&self) -> usize388     pub fn len(&self) -> usize {
389         self.entries.len()
390     }
391 
392     /// Returns `true` if the map contains no elements.
393     ///
394     /// # Examples
395     ///
396     /// ```
397     /// use tokio_stream::{StreamMap, pending};
398     ///
399     /// let mut a = StreamMap::new();
400     /// assert!(a.is_empty());
401     /// a.insert(1, pending::<i32>());
402     /// assert!(!a.is_empty());
403     /// ```
is_empty(&self) -> bool404     pub fn is_empty(&self) -> bool {
405         self.entries.is_empty()
406     }
407 
408     /// Clears the map, removing all key-stream pairs. Keeps the allocated
409     /// memory for reuse.
410     ///
411     /// # Examples
412     ///
413     /// ```
414     /// use tokio_stream::{StreamMap, pending};
415     ///
416     /// let mut a = StreamMap::new();
417     /// a.insert(1, pending::<i32>());
418     /// a.clear();
419     /// assert!(a.is_empty());
420     /// ```
clear(&mut self)421     pub fn clear(&mut self) {
422         self.entries.clear();
423     }
424 
425     /// Insert a key-stream pair into the map.
426     ///
427     /// If the map did not have this key present, `None` is returned.
428     ///
429     /// If the map did have this key present, the new `stream` replaces the old
430     /// one and the old stream is returned.
431     ///
432     /// # Examples
433     ///
434     /// ```
435     /// use tokio_stream::{StreamMap, pending};
436     ///
437     /// let mut map = StreamMap::new();
438     ///
439     /// assert!(map.insert(37, pending::<i32>()).is_none());
440     /// assert!(!map.is_empty());
441     ///
442     /// map.insert(37, pending());
443     /// assert!(map.insert(37, pending()).is_some());
444     /// ```
insert(&mut self, k: K, stream: V) -> Option<V> where K: Hash + Eq,445     pub fn insert(&mut self, k: K, stream: V) -> Option<V>
446     where
447         K: Hash + Eq,
448     {
449         let ret = self.remove(&k);
450         self.entries.push((k, stream));
451 
452         ret
453     }
454 
455     /// Removes a key from the map, returning the stream at the key if the key was previously in the map.
456     ///
457     /// The key may be any borrowed form of the map's key type, but `Hash` and
458     /// `Eq` on the borrowed form must match those for the key type.
459     ///
460     /// # Examples
461     ///
462     /// ```
463     /// use tokio_stream::{StreamMap, pending};
464     ///
465     /// let mut map = StreamMap::new();
466     /// map.insert(1, pending::<i32>());
467     /// assert!(map.remove(&1).is_some());
468     /// assert!(map.remove(&1).is_none());
469     /// ```
remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> where K: Borrow<Q>, Q: Hash + Eq,470     pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V>
471     where
472         K: Borrow<Q>,
473         Q: Hash + Eq,
474     {
475         for i in 0..self.entries.len() {
476             if self.entries[i].0.borrow() == k {
477                 return Some(self.entries.swap_remove(i).1);
478             }
479         }
480 
481         None
482     }
483 
484     /// Returns `true` if the map contains a stream for the specified key.
485     ///
486     /// The key may be any borrowed form of the map's key type, but `Hash` and
487     /// `Eq` on the borrowed form must match those for the key type.
488     ///
489     /// # Examples
490     ///
491     /// ```
492     /// use tokio_stream::{StreamMap, pending};
493     ///
494     /// let mut map = StreamMap::new();
495     /// map.insert(1, pending::<i32>());
496     /// assert_eq!(map.contains_key(&1), true);
497     /// assert_eq!(map.contains_key(&2), false);
498     /// ```
contains_key<Q: ?Sized>(&self, k: &Q) -> bool where K: Borrow<Q>, Q: Hash + Eq,499     pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool
500     where
501         K: Borrow<Q>,
502         Q: Hash + Eq,
503     {
504         for i in 0..self.entries.len() {
505             if self.entries[i].0.borrow() == k {
506                 return true;
507             }
508         }
509 
510         false
511     }
512 }
513 
514 impl<K, V> StreamMap<K, V>
515 where
516     K: Unpin,
517     V: Stream + Unpin,
518 {
519     /// Polls the next value, includes the vec entry index
poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>>520     fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> {
521         use Poll::*;
522 
523         let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize;
524         let mut idx = start;
525 
526         for _ in 0..self.entries.len() {
527             let (_, stream) = &mut self.entries[idx];
528 
529             match Pin::new(stream).poll_next(cx) {
530                 Ready(Some(val)) => return Ready(Some((idx, val))),
531                 Ready(None) => {
532                     // Remove the entry
533                     self.entries.swap_remove(idx);
534 
535                     // Check if this was the last entry, if so the cursor needs
536                     // to wrap
537                     if idx == self.entries.len() {
538                         idx = 0;
539                     } else if idx < start && start <= self.entries.len() {
540                         // The stream being swapped into the current index has
541                         // already been polled, so skip it.
542                         idx = idx.wrapping_add(1) % self.entries.len();
543                     }
544                 }
545                 Pending => {
546                     idx = idx.wrapping_add(1) % self.entries.len();
547                 }
548             }
549         }
550 
551         // If the map is empty, then the stream is complete.
552         if self.entries.is_empty() {
553             Ready(None)
554         } else {
555             Pending
556         }
557     }
558 }
559 
560 impl<K, V> Default for StreamMap<K, V> {
default() -> Self561     fn default() -> Self {
562         Self::new()
563     }
564 }
565 
566 impl<K, V> Stream for StreamMap<K, V>
567 where
568     K: Clone + Unpin,
569     V: Stream + Unpin,
570 {
571     type Item = (K, V::Item);
572 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>573     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
574         if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) {
575             let key = self.entries[idx].0.clone();
576             Poll::Ready(Some((key, val)))
577         } else {
578             Poll::Ready(None)
579         }
580     }
581 
size_hint(&self) -> (usize, Option<usize>)582     fn size_hint(&self) -> (usize, Option<usize>) {
583         let mut ret = (0, Some(0));
584 
585         for (_, stream) in &self.entries {
586             let hint = stream.size_hint();
587 
588             ret.0 += hint.0;
589 
590             match (ret.1, hint.1) {
591                 (Some(a), Some(b)) => ret.1 = Some(a + b),
592                 (Some(_), None) => ret.1 = None,
593                 _ => {}
594             }
595         }
596 
597         ret
598     }
599 }
600 
601 impl<K, V> FromIterator<(K, V)> for StreamMap<K, V>
602 where
603     K: Hash + Eq,
604 {
from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self605     fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
606         let iterator = iter.into_iter();
607         let (lower_bound, _) = iterator.size_hint();
608         let mut stream_map = Self::with_capacity(lower_bound);
609 
610         for (key, value) in iterator {
611             stream_map.insert(key, value);
612         }
613 
614         stream_map
615     }
616 }
617 
618 impl<K, V> Extend<(K, V)> for StreamMap<K, V> {
extend<T>(&mut self, iter: T) where T: IntoIterator<Item = (K, V)>,619     fn extend<T>(&mut self, iter: T)
620     where
621         T: IntoIterator<Item = (K, V)>,
622     {
623         self.entries.extend(iter);
624     }
625 }
626 
627 mod rand {
628     use std::cell::Cell;
629 
630     mod loom {
631         #[cfg(not(loom))]
632         pub(crate) mod rand {
633             use std::collections::hash_map::RandomState;
634             use std::hash::{BuildHasher, Hash, Hasher};
635             use std::sync::atomic::AtomicU32;
636             use std::sync::atomic::Ordering::Relaxed;
637 
638             static COUNTER: AtomicU32 = AtomicU32::new(1);
639 
seed() -> u64640             pub(crate) fn seed() -> u64 {
641                 let rand_state = RandomState::new();
642 
643                 let mut hasher = rand_state.build_hasher();
644 
645                 // Hash some unique-ish data to generate some new state
646                 COUNTER.fetch_add(1, Relaxed).hash(&mut hasher);
647 
648                 // Get the seed
649                 hasher.finish()
650             }
651         }
652 
653         #[cfg(loom)]
654         pub(crate) mod rand {
seed() -> u64655             pub(crate) fn seed() -> u64 {
656                 1
657             }
658         }
659     }
660 
661     /// Fast random number generate
662     ///
663     /// Implement xorshift64+: 2 32-bit xorshift sequences added together.
664     /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's
665     /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf>
666     /// This generator passes the SmallCrush suite, part of TestU01 framework:
667     /// <http://simul.iro.umontreal.ca/testu01/tu01.html>
668     #[derive(Debug)]
669     pub(crate) struct FastRand {
670         one: Cell<u32>,
671         two: Cell<u32>,
672     }
673 
674     impl FastRand {
675         /// Initialize a new, thread-local, fast random number generator.
new(seed: u64) -> FastRand676         pub(crate) fn new(seed: u64) -> FastRand {
677             let one = (seed >> 32) as u32;
678             let mut two = seed as u32;
679 
680             if two == 0 {
681                 // This value cannot be zero
682                 two = 1;
683             }
684 
685             FastRand {
686                 one: Cell::new(one),
687                 two: Cell::new(two),
688             }
689         }
690 
fastrand_n(&self, n: u32) -> u32691         pub(crate) fn fastrand_n(&self, n: u32) -> u32 {
692             // This is similar to fastrand() % n, but faster.
693             // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
694             let mul = (self.fastrand() as u64).wrapping_mul(n as u64);
695             (mul >> 32) as u32
696         }
697 
fastrand(&self) -> u32698         fn fastrand(&self) -> u32 {
699             let mut s1 = self.one.get();
700             let s0 = self.two.get();
701 
702             s1 ^= s1 << 17;
703             s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16;
704 
705             self.one.set(s0);
706             self.two.set(s1);
707 
708             s0.wrapping_add(s1)
709         }
710     }
711 
712     // Used by `StreamMap`
thread_rng_n(n: u32) -> u32713     pub(crate) fn thread_rng_n(n: u32) -> u32 {
714         thread_local! {
715             static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed());
716         }
717 
718         THREAD_RNG.with(|rng| rng.fastrand_n(n))
719     }
720 }
721