1 use crate::Stream; 2 3 use std::borrow::Borrow; 4 use std::hash::Hash; 5 use std::pin::Pin; 6 use std::task::{Context, Poll}; 7 8 /// Combine many streams into one, indexing each source stream with a unique 9 /// key. 10 /// 11 /// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source 12 /// streams into a single merged stream that yields values in the order that 13 /// they arrive from the source streams. However, `StreamMap` has a lot more 14 /// flexibility in usage patterns. 15 /// 16 /// `StreamMap` can: 17 /// 18 /// * Merge an arbitrary number of streams. 19 /// * Track which source stream the value was received from. 20 /// * Handle inserting and removing streams from the set of managed streams at 21 /// any point during iteration. 22 /// 23 /// All source streams held by `StreamMap` are indexed using a key. This key is 24 /// included with the value when a source stream yields a value. The key is also 25 /// used to remove the stream from the `StreamMap` before the stream has 26 /// completed streaming. 27 /// 28 /// # `Unpin` 29 /// 30 /// Because the `StreamMap` API moves streams during runtime, both streams and 31 /// keys must be `Unpin`. In order to insert a `!Unpin` stream into a 32 /// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to 33 /// pin the stream in the heap. 34 /// 35 /// # Implementation 36 /// 37 /// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this 38 /// internal implementation detail will persist in future versions, but it is 39 /// important to know the runtime implications. In general, `StreamMap` works 40 /// best with a "smallish" number of streams as all entries are scanned on 41 /// insert, remove, and polling. In cases where a large number of streams need 42 /// to be merged, it may be advisable to use tasks sending values on a shared 43 /// [`mpsc`] channel. 44 /// 45 /// # Notes 46 /// 47 /// `StreamMap` removes finished streams automatically, without alerting the user. 48 /// In some scenarios, the caller would want to know on closed streams. 49 /// To do this, use [`StreamNotifyClose`] as a wrapper to your stream. 50 /// It will return None when the stream is closed. 51 /// 52 /// [`StreamExt::merge`]: crate::StreamExt::merge 53 /// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html 54 /// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html 55 /// [`Box::pin`]: std::boxed::Box::pin 56 /// [`StreamNotifyClose`]: crate::StreamNotifyClose 57 /// 58 /// # Examples 59 /// 60 /// Merging two streams, then remove them after receiving the first value 61 /// 62 /// ``` 63 /// use tokio_stream::{StreamExt, StreamMap, Stream}; 64 /// use tokio::sync::mpsc; 65 /// use std::pin::Pin; 66 /// 67 /// #[tokio::main] 68 /// async fn main() { 69 /// let (tx1, mut rx1) = mpsc::channel::<usize>(10); 70 /// let (tx2, mut rx2) = mpsc::channel::<usize>(10); 71 /// 72 /// // Convert the channels to a `Stream`. 73 /// let rx1 = Box::pin(async_stream::stream! { 74 /// while let Some(item) = rx1.recv().await { 75 /// yield item; 76 /// } 77 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; 78 /// 79 /// let rx2 = Box::pin(async_stream::stream! { 80 /// while let Some(item) = rx2.recv().await { 81 /// yield item; 82 /// } 83 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; 84 /// 85 /// tokio::spawn(async move { 86 /// tx1.send(1).await.unwrap(); 87 /// 88 /// // This value will never be received. The send may or may not return 89 /// // `Err` depending on if the remote end closed first or not. 90 /// let _ = tx1.send(2).await; 91 /// }); 92 /// 93 /// tokio::spawn(async move { 94 /// tx2.send(3).await.unwrap(); 95 /// let _ = tx2.send(4).await; 96 /// }); 97 /// 98 /// let mut map = StreamMap::new(); 99 /// 100 /// // Insert both streams 101 /// map.insert("one", rx1); 102 /// map.insert("two", rx2); 103 /// 104 /// // Read twice 105 /// for _ in 0..2 { 106 /// let (key, val) = map.next().await.unwrap(); 107 /// 108 /// if key == "one" { 109 /// assert_eq!(val, 1); 110 /// } else { 111 /// assert_eq!(val, 3); 112 /// } 113 /// 114 /// // Remove the stream to prevent reading the next value 115 /// map.remove(key); 116 /// } 117 /// } 118 /// ``` 119 /// 120 /// This example models a read-only client to a chat system with channels. The 121 /// client sends commands to join and leave channels. `StreamMap` is used to 122 /// manage active channel subscriptions. 123 /// 124 /// For simplicity, messages are displayed with `println!`, but they could be 125 /// sent to the client over a socket. 126 /// 127 /// ```no_run 128 /// use tokio_stream::{Stream, StreamExt, StreamMap}; 129 /// 130 /// enum Command { 131 /// Join(String), 132 /// Leave(String), 133 /// } 134 /// 135 /// fn commands() -> impl Stream<Item = Command> { 136 /// // Streams in user commands by parsing `stdin`. 137 /// # tokio_stream::pending() 138 /// } 139 /// 140 /// // Join a channel, returns a stream of messages received on the channel. 141 /// fn join(channel: &str) -> impl Stream<Item = String> + Unpin { 142 /// // left as an exercise to the reader 143 /// # tokio_stream::pending() 144 /// } 145 /// 146 /// #[tokio::main] 147 /// async fn main() { 148 /// let mut channels = StreamMap::new(); 149 /// 150 /// // Input commands (join / leave channels). 151 /// let cmds = commands(); 152 /// tokio::pin!(cmds); 153 /// 154 /// loop { 155 /// tokio::select! { 156 /// Some(cmd) = cmds.next() => { 157 /// match cmd { 158 /// Command::Join(chan) => { 159 /// // Join the channel and add it to the `channels` 160 /// // stream map 161 /// let msgs = join(&chan); 162 /// channels.insert(chan, msgs); 163 /// } 164 /// Command::Leave(chan) => { 165 /// channels.remove(&chan); 166 /// } 167 /// } 168 /// } 169 /// Some((chan, msg)) = channels.next() => { 170 /// // Received a message, display it on stdout with the channel 171 /// // it originated from. 172 /// println!("{}: {}", chan, msg); 173 /// } 174 /// // Both the `commands` stream and the `channels` stream are 175 /// // complete. There is no more work to do, so leave the loop. 176 /// else => break, 177 /// } 178 /// } 179 /// } 180 /// ``` 181 /// 182 /// Using `StreamNotifyClose` to handle closed streams with `StreamMap`. 183 /// 184 /// ``` 185 /// use tokio_stream::{StreamExt, StreamMap, StreamNotifyClose}; 186 /// 187 /// #[tokio::main] 188 /// async fn main() { 189 /// let mut map = StreamMap::new(); 190 /// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); 191 /// let stream2 = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1])); 192 /// map.insert(0, stream); 193 /// map.insert(1, stream2); 194 /// while let Some((key, val)) = map.next().await { 195 /// match val { 196 /// Some(val) => println!("got {val:?} from stream {key:?}"), 197 /// None => println!("stream {key:?} closed"), 198 /// } 199 /// } 200 /// } 201 /// ``` 202 203 #[derive(Debug)] 204 pub struct StreamMap<K, V> { 205 /// Streams stored in the map 206 entries: Vec<(K, V)>, 207 } 208 209 impl<K, V> StreamMap<K, V> { 210 /// An iterator visiting all key-value pairs in arbitrary order. 211 /// 212 /// The iterator element type is &'a (K, V). 213 /// 214 /// # Examples 215 /// 216 /// ``` 217 /// use tokio_stream::{StreamMap, pending}; 218 /// 219 /// let mut map = StreamMap::new(); 220 /// 221 /// map.insert("a", pending::<i32>()); 222 /// map.insert("b", pending()); 223 /// map.insert("c", pending()); 224 /// 225 /// for (key, stream) in map.iter() { 226 /// println!("({}, {:?})", key, stream); 227 /// } 228 /// ``` iter(&self) -> impl Iterator<Item = &(K, V)>229 pub fn iter(&self) -> impl Iterator<Item = &(K, V)> { 230 self.entries.iter() 231 } 232 233 /// An iterator visiting all key-value pairs mutably in arbitrary order. 234 /// 235 /// The iterator element type is &'a mut (K, V). 236 /// 237 /// # Examples 238 /// 239 /// ``` 240 /// use tokio_stream::{StreamMap, pending}; 241 /// 242 /// let mut map = StreamMap::new(); 243 /// 244 /// map.insert("a", pending::<i32>()); 245 /// map.insert("b", pending()); 246 /// map.insert("c", pending()); 247 /// 248 /// for (key, stream) in map.iter_mut() { 249 /// println!("({}, {:?})", key, stream); 250 /// } 251 /// ``` iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)>252 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> { 253 self.entries.iter_mut() 254 } 255 256 /// Creates an empty `StreamMap`. 257 /// 258 /// The stream map is initially created with a capacity of `0`, so it will 259 /// not allocate until it is first inserted into. 260 /// 261 /// # Examples 262 /// 263 /// ``` 264 /// use tokio_stream::{StreamMap, Pending}; 265 /// 266 /// let map: StreamMap<&str, Pending<()>> = StreamMap::new(); 267 /// ``` new() -> StreamMap<K, V>268 pub fn new() -> StreamMap<K, V> { 269 StreamMap { entries: vec![] } 270 } 271 272 /// Creates an empty `StreamMap` with the specified capacity. 273 /// 274 /// The stream map will be able to hold at least `capacity` elements without 275 /// reallocating. If `capacity` is 0, the stream map will not allocate. 276 /// 277 /// # Examples 278 /// 279 /// ``` 280 /// use tokio_stream::{StreamMap, Pending}; 281 /// 282 /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10); 283 /// ``` with_capacity(capacity: usize) -> StreamMap<K, V>284 pub fn with_capacity(capacity: usize) -> StreamMap<K, V> { 285 StreamMap { 286 entries: Vec::with_capacity(capacity), 287 } 288 } 289 290 /// Returns an iterator visiting all keys in arbitrary order. 291 /// 292 /// The iterator element type is &'a K. 293 /// 294 /// # Examples 295 /// 296 /// ``` 297 /// use tokio_stream::{StreamMap, pending}; 298 /// 299 /// let mut map = StreamMap::new(); 300 /// 301 /// map.insert("a", pending::<i32>()); 302 /// map.insert("b", pending()); 303 /// map.insert("c", pending()); 304 /// 305 /// for key in map.keys() { 306 /// println!("{}", key); 307 /// } 308 /// ``` keys(&self) -> impl Iterator<Item = &K>309 pub fn keys(&self) -> impl Iterator<Item = &K> { 310 self.iter().map(|(k, _)| k) 311 } 312 313 /// An iterator visiting all values in arbitrary order. 314 /// 315 /// The iterator element type is &'a V. 316 /// 317 /// # Examples 318 /// 319 /// ``` 320 /// use tokio_stream::{StreamMap, pending}; 321 /// 322 /// let mut map = StreamMap::new(); 323 /// 324 /// map.insert("a", pending::<i32>()); 325 /// map.insert("b", pending()); 326 /// map.insert("c", pending()); 327 /// 328 /// for stream in map.values() { 329 /// println!("{:?}", stream); 330 /// } 331 /// ``` values(&self) -> impl Iterator<Item = &V>332 pub fn values(&self) -> impl Iterator<Item = &V> { 333 self.iter().map(|(_, v)| v) 334 } 335 336 /// An iterator visiting all values mutably in arbitrary order. 337 /// 338 /// The iterator element type is &'a mut V. 339 /// 340 /// # Examples 341 /// 342 /// ``` 343 /// use tokio_stream::{StreamMap, pending}; 344 /// 345 /// let mut map = StreamMap::new(); 346 /// 347 /// map.insert("a", pending::<i32>()); 348 /// map.insert("b", pending()); 349 /// map.insert("c", pending()); 350 /// 351 /// for stream in map.values_mut() { 352 /// println!("{:?}", stream); 353 /// } 354 /// ``` values_mut(&mut self) -> impl Iterator<Item = &mut V>355 pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> { 356 self.iter_mut().map(|(_, v)| v) 357 } 358 359 /// Returns the number of streams the map can hold without reallocating. 360 /// 361 /// This number is a lower bound; the `StreamMap` might be able to hold 362 /// more, but is guaranteed to be able to hold at least this many. 363 /// 364 /// # Examples 365 /// 366 /// ``` 367 /// use tokio_stream::{StreamMap, Pending}; 368 /// 369 /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100); 370 /// assert!(map.capacity() >= 100); 371 /// ``` capacity(&self) -> usize372 pub fn capacity(&self) -> usize { 373 self.entries.capacity() 374 } 375 376 /// Returns the number of streams in the map. 377 /// 378 /// # Examples 379 /// 380 /// ``` 381 /// use tokio_stream::{StreamMap, pending}; 382 /// 383 /// let mut a = StreamMap::new(); 384 /// assert_eq!(a.len(), 0); 385 /// a.insert(1, pending::<i32>()); 386 /// assert_eq!(a.len(), 1); 387 /// ``` len(&self) -> usize388 pub fn len(&self) -> usize { 389 self.entries.len() 390 } 391 392 /// Returns `true` if the map contains no elements. 393 /// 394 /// # Examples 395 /// 396 /// ``` 397 /// use tokio_stream::{StreamMap, pending}; 398 /// 399 /// let mut a = StreamMap::new(); 400 /// assert!(a.is_empty()); 401 /// a.insert(1, pending::<i32>()); 402 /// assert!(!a.is_empty()); 403 /// ``` is_empty(&self) -> bool404 pub fn is_empty(&self) -> bool { 405 self.entries.is_empty() 406 } 407 408 /// Clears the map, removing all key-stream pairs. Keeps the allocated 409 /// memory for reuse. 410 /// 411 /// # Examples 412 /// 413 /// ``` 414 /// use tokio_stream::{StreamMap, pending}; 415 /// 416 /// let mut a = StreamMap::new(); 417 /// a.insert(1, pending::<i32>()); 418 /// a.clear(); 419 /// assert!(a.is_empty()); 420 /// ``` clear(&mut self)421 pub fn clear(&mut self) { 422 self.entries.clear(); 423 } 424 425 /// Insert a key-stream pair into the map. 426 /// 427 /// If the map did not have this key present, `None` is returned. 428 /// 429 /// If the map did have this key present, the new `stream` replaces the old 430 /// one and the old stream is returned. 431 /// 432 /// # Examples 433 /// 434 /// ``` 435 /// use tokio_stream::{StreamMap, pending}; 436 /// 437 /// let mut map = StreamMap::new(); 438 /// 439 /// assert!(map.insert(37, pending::<i32>()).is_none()); 440 /// assert!(!map.is_empty()); 441 /// 442 /// map.insert(37, pending()); 443 /// assert!(map.insert(37, pending()).is_some()); 444 /// ``` insert(&mut self, k: K, stream: V) -> Option<V> where K: Hash + Eq,445 pub fn insert(&mut self, k: K, stream: V) -> Option<V> 446 where 447 K: Hash + Eq, 448 { 449 let ret = self.remove(&k); 450 self.entries.push((k, stream)); 451 452 ret 453 } 454 455 /// Removes a key from the map, returning the stream at the key if the key was previously in the map. 456 /// 457 /// The key may be any borrowed form of the map's key type, but `Hash` and 458 /// `Eq` on the borrowed form must match those for the key type. 459 /// 460 /// # Examples 461 /// 462 /// ``` 463 /// use tokio_stream::{StreamMap, pending}; 464 /// 465 /// let mut map = StreamMap::new(); 466 /// map.insert(1, pending::<i32>()); 467 /// assert!(map.remove(&1).is_some()); 468 /// assert!(map.remove(&1).is_none()); 469 /// ``` remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> where K: Borrow<Q>, Q: Hash + Eq,470 pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> 471 where 472 K: Borrow<Q>, 473 Q: Hash + Eq, 474 { 475 for i in 0..self.entries.len() { 476 if self.entries[i].0.borrow() == k { 477 return Some(self.entries.swap_remove(i).1); 478 } 479 } 480 481 None 482 } 483 484 /// Returns `true` if the map contains a stream for the specified key. 485 /// 486 /// The key may be any borrowed form of the map's key type, but `Hash` and 487 /// `Eq` on the borrowed form must match those for the key type. 488 /// 489 /// # Examples 490 /// 491 /// ``` 492 /// use tokio_stream::{StreamMap, pending}; 493 /// 494 /// let mut map = StreamMap::new(); 495 /// map.insert(1, pending::<i32>()); 496 /// assert_eq!(map.contains_key(&1), true); 497 /// assert_eq!(map.contains_key(&2), false); 498 /// ``` contains_key<Q: ?Sized>(&self, k: &Q) -> bool where K: Borrow<Q>, Q: Hash + Eq,499 pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool 500 where 501 K: Borrow<Q>, 502 Q: Hash + Eq, 503 { 504 for i in 0..self.entries.len() { 505 if self.entries[i].0.borrow() == k { 506 return true; 507 } 508 } 509 510 false 511 } 512 } 513 514 impl<K, V> StreamMap<K, V> 515 where 516 K: Unpin, 517 V: Stream + Unpin, 518 { 519 /// Polls the next value, includes the vec entry index poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>>520 fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> { 521 use Poll::*; 522 523 let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize; 524 let mut idx = start; 525 526 for _ in 0..self.entries.len() { 527 let (_, stream) = &mut self.entries[idx]; 528 529 match Pin::new(stream).poll_next(cx) { 530 Ready(Some(val)) => return Ready(Some((idx, val))), 531 Ready(None) => { 532 // Remove the entry 533 self.entries.swap_remove(idx); 534 535 // Check if this was the last entry, if so the cursor needs 536 // to wrap 537 if idx == self.entries.len() { 538 idx = 0; 539 } else if idx < start && start <= self.entries.len() { 540 // The stream being swapped into the current index has 541 // already been polled, so skip it. 542 idx = idx.wrapping_add(1) % self.entries.len(); 543 } 544 } 545 Pending => { 546 idx = idx.wrapping_add(1) % self.entries.len(); 547 } 548 } 549 } 550 551 // If the map is empty, then the stream is complete. 552 if self.entries.is_empty() { 553 Ready(None) 554 } else { 555 Pending 556 } 557 } 558 } 559 560 impl<K, V> Default for StreamMap<K, V> { default() -> Self561 fn default() -> Self { 562 Self::new() 563 } 564 } 565 566 impl<K, V> Stream for StreamMap<K, V> 567 where 568 K: Clone + Unpin, 569 V: Stream + Unpin, 570 { 571 type Item = (K, V::Item); 572 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>573 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 574 if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) { 575 let key = self.entries[idx].0.clone(); 576 Poll::Ready(Some((key, val))) 577 } else { 578 Poll::Ready(None) 579 } 580 } 581 size_hint(&self) -> (usize, Option<usize>)582 fn size_hint(&self) -> (usize, Option<usize>) { 583 let mut ret = (0, Some(0)); 584 585 for (_, stream) in &self.entries { 586 let hint = stream.size_hint(); 587 588 ret.0 += hint.0; 589 590 match (ret.1, hint.1) { 591 (Some(a), Some(b)) => ret.1 = Some(a + b), 592 (Some(_), None) => ret.1 = None, 593 _ => {} 594 } 595 } 596 597 ret 598 } 599 } 600 601 impl<K, V> FromIterator<(K, V)> for StreamMap<K, V> 602 where 603 K: Hash + Eq, 604 { from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self605 fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self { 606 let iterator = iter.into_iter(); 607 let (lower_bound, _) = iterator.size_hint(); 608 let mut stream_map = Self::with_capacity(lower_bound); 609 610 for (key, value) in iterator { 611 stream_map.insert(key, value); 612 } 613 614 stream_map 615 } 616 } 617 618 impl<K, V> Extend<(K, V)> for StreamMap<K, V> { extend<T>(&mut self, iter: T) where T: IntoIterator<Item = (K, V)>,619 fn extend<T>(&mut self, iter: T) 620 where 621 T: IntoIterator<Item = (K, V)>, 622 { 623 self.entries.extend(iter); 624 } 625 } 626 627 mod rand { 628 use std::cell::Cell; 629 630 mod loom { 631 #[cfg(not(loom))] 632 pub(crate) mod rand { 633 use std::collections::hash_map::RandomState; 634 use std::hash::{BuildHasher, Hash, Hasher}; 635 use std::sync::atomic::AtomicU32; 636 use std::sync::atomic::Ordering::Relaxed; 637 638 static COUNTER: AtomicU32 = AtomicU32::new(1); 639 seed() -> u64640 pub(crate) fn seed() -> u64 { 641 let rand_state = RandomState::new(); 642 643 let mut hasher = rand_state.build_hasher(); 644 645 // Hash some unique-ish data to generate some new state 646 COUNTER.fetch_add(1, Relaxed).hash(&mut hasher); 647 648 // Get the seed 649 hasher.finish() 650 } 651 } 652 653 #[cfg(loom)] 654 pub(crate) mod rand { seed() -> u64655 pub(crate) fn seed() -> u64 { 656 1 657 } 658 } 659 } 660 661 /// Fast random number generate 662 /// 663 /// Implement xorshift64+: 2 32-bit xorshift sequences added together. 664 /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's 665 /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf> 666 /// This generator passes the SmallCrush suite, part of TestU01 framework: 667 /// <http://simul.iro.umontreal.ca/testu01/tu01.html> 668 #[derive(Debug)] 669 pub(crate) struct FastRand { 670 one: Cell<u32>, 671 two: Cell<u32>, 672 } 673 674 impl FastRand { 675 /// Initialize a new, thread-local, fast random number generator. new(seed: u64) -> FastRand676 pub(crate) fn new(seed: u64) -> FastRand { 677 let one = (seed >> 32) as u32; 678 let mut two = seed as u32; 679 680 if two == 0 { 681 // This value cannot be zero 682 two = 1; 683 } 684 685 FastRand { 686 one: Cell::new(one), 687 two: Cell::new(two), 688 } 689 } 690 fastrand_n(&self, n: u32) -> u32691 pub(crate) fn fastrand_n(&self, n: u32) -> u32 { 692 // This is similar to fastrand() % n, but faster. 693 // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ 694 let mul = (self.fastrand() as u64).wrapping_mul(n as u64); 695 (mul >> 32) as u32 696 } 697 fastrand(&self) -> u32698 fn fastrand(&self) -> u32 { 699 let mut s1 = self.one.get(); 700 let s0 = self.two.get(); 701 702 s1 ^= s1 << 17; 703 s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16; 704 705 self.one.set(s0); 706 self.two.set(s1); 707 708 s0.wrapping_add(s1) 709 } 710 } 711 712 // Used by `StreamMap` thread_rng_n(n: u32) -> u32713 pub(crate) fn thread_rng_n(n: u32) -> u32 { 714 thread_local! { 715 static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed()); 716 } 717 718 THREAD_RNG.with(|rng| rng.fastrand_n(n)) 719 } 720 } 721