1 use crate::Stream; 2 3 use std::borrow::Borrow; 4 use std::hash::Hash; 5 use std::pin::Pin; 6 use std::task::{Context, Poll}; 7 8 /// Combine many streams into one, indexing each source stream with a unique 9 /// key. 10 /// 11 /// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source 12 /// streams into a single merged stream that yields values in the order that 13 /// they arrive from the source streams. However, `StreamMap` has a lot more 14 /// flexibility in usage patterns. 15 /// 16 /// `StreamMap` can: 17 /// 18 /// * Merge an arbitrary number of streams. 19 /// * Track which source stream the value was received from. 20 /// * Handle inserting and removing streams from the set of managed streams at 21 /// any point during iteration. 22 /// 23 /// All source streams held by `StreamMap` are indexed using a key. This key is 24 /// included with the value when a source stream yields a value. The key is also 25 /// used to remove the stream from the `StreamMap` before the stream has 26 /// completed streaming. 27 /// 28 /// # `Unpin` 29 /// 30 /// Because the `StreamMap` API moves streams during runtime, both streams and 31 /// keys must be `Unpin`. In order to insert a `!Unpin` stream into a 32 /// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to 33 /// pin the stream in the heap. 34 /// 35 /// # Implementation 36 /// 37 /// `StreamMap` is backed by a `Vec<(K, V)>`. There is no guarantee that this 38 /// internal implementation detail will persist in future versions, but it is 39 /// important to know the runtime implications. In general, `StreamMap` works 40 /// best with a "smallish" number of streams as all entries are scanned on 41 /// insert, remove, and polling. In cases where a large number of streams need 42 /// to be merged, it may be advisable to use tasks sending values on a shared 43 /// [`mpsc`] channel. 44 /// 45 /// [`StreamExt::merge`]: crate::StreamExt::merge 46 /// [`mpsc`]: https://docs.rs/tokio/1.0/tokio/sync/mpsc/index.html 47 /// [`pin!`]: https://docs.rs/tokio/1.0/tokio/macro.pin.html 48 /// [`Box::pin`]: std::boxed::Box::pin 49 /// 50 /// # Examples 51 /// 52 /// Merging two streams, then remove them after receiving the first value 53 /// 54 /// ``` 55 /// use tokio_stream::{StreamExt, StreamMap, Stream}; 56 /// use tokio::sync::mpsc; 57 /// use std::pin::Pin; 58 /// 59 /// #[tokio::main] 60 /// async fn main() { 61 /// let (tx1, mut rx1) = mpsc::channel::<usize>(10); 62 /// let (tx2, mut rx2) = mpsc::channel::<usize>(10); 63 /// 64 /// // Convert the channels to a `Stream`. 65 /// let rx1 = Box::pin(async_stream::stream! { 66 /// while let Some(item) = rx1.recv().await { 67 /// yield item; 68 /// } 69 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; 70 /// 71 /// let rx2 = Box::pin(async_stream::stream! { 72 /// while let Some(item) = rx2.recv().await { 73 /// yield item; 74 /// } 75 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>; 76 /// 77 /// tokio::spawn(async move { 78 /// tx1.send(1).await.unwrap(); 79 /// 80 /// // This value will never be received. The send may or may not return 81 /// // `Err` depending on if the remote end closed first or not. 82 /// let _ = tx1.send(2).await; 83 /// }); 84 /// 85 /// tokio::spawn(async move { 86 /// tx2.send(3).await.unwrap(); 87 /// let _ = tx2.send(4).await; 88 /// }); 89 /// 90 /// let mut map = StreamMap::new(); 91 /// 92 /// // Insert both streams 93 /// map.insert("one", rx1); 94 /// map.insert("two", rx2); 95 /// 96 /// // Read twice 97 /// for _ in 0..2 { 98 /// let (key, val) = map.next().await.unwrap(); 99 /// 100 /// if key == "one" { 101 /// assert_eq!(val, 1); 102 /// } else { 103 /// assert_eq!(val, 3); 104 /// } 105 /// 106 /// // Remove the stream to prevent reading the next value 107 /// map.remove(key); 108 /// } 109 /// } 110 /// ``` 111 /// 112 /// This example models a read-only client to a chat system with channels. The 113 /// client sends commands to join and leave channels. `StreamMap` is used to 114 /// manage active channel subscriptions. 115 /// 116 /// For simplicity, messages are displayed with `println!`, but they could be 117 /// sent to the client over a socket. 118 /// 119 /// ```no_run 120 /// use tokio_stream::{Stream, StreamExt, StreamMap}; 121 /// 122 /// enum Command { 123 /// Join(String), 124 /// Leave(String), 125 /// } 126 /// 127 /// fn commands() -> impl Stream<Item = Command> { 128 /// // Streams in user commands by parsing `stdin`. 129 /// # tokio_stream::pending() 130 /// } 131 /// 132 /// // Join a channel, returns a stream of messages received on the channel. 133 /// fn join(channel: &str) -> impl Stream<Item = String> + Unpin { 134 /// // left as an exercise to the reader 135 /// # tokio_stream::pending() 136 /// } 137 /// 138 /// #[tokio::main] 139 /// async fn main() { 140 /// let mut channels = StreamMap::new(); 141 /// 142 /// // Input commands (join / leave channels). 143 /// let cmds = commands(); 144 /// tokio::pin!(cmds); 145 /// 146 /// loop { 147 /// tokio::select! { 148 /// Some(cmd) = cmds.next() => { 149 /// match cmd { 150 /// Command::Join(chan) => { 151 /// // Join the channel and add it to the `channels` 152 /// // stream map 153 /// let msgs = join(&chan); 154 /// channels.insert(chan, msgs); 155 /// } 156 /// Command::Leave(chan) => { 157 /// channels.remove(&chan); 158 /// } 159 /// } 160 /// } 161 /// Some((chan, msg)) = channels.next() => { 162 /// // Received a message, display it on stdout with the channel 163 /// // it originated from. 164 /// println!("{}: {}", chan, msg); 165 /// } 166 /// // Both the `commands` stream and the `channels` stream are 167 /// // complete. There is no more work to do, so leave the loop. 168 /// else => break, 169 /// } 170 /// } 171 /// } 172 /// ``` 173 #[derive(Debug)] 174 pub struct StreamMap<K, V> { 175 /// Streams stored in the map 176 entries: Vec<(K, V)>, 177 } 178 179 impl<K, V> StreamMap<K, V> { 180 /// An iterator visiting all key-value pairs in arbitrary order. 181 /// 182 /// The iterator element type is &'a (K, V). 183 /// 184 /// # Examples 185 /// 186 /// ``` 187 /// use tokio_stream::{StreamMap, pending}; 188 /// 189 /// let mut map = StreamMap::new(); 190 /// 191 /// map.insert("a", pending::<i32>()); 192 /// map.insert("b", pending()); 193 /// map.insert("c", pending()); 194 /// 195 /// for (key, stream) in map.iter() { 196 /// println!("({}, {:?})", key, stream); 197 /// } 198 /// ``` iter(&self) -> impl Iterator<Item = &(K, V)>199 pub fn iter(&self) -> impl Iterator<Item = &(K, V)> { 200 self.entries.iter() 201 } 202 203 /// An iterator visiting all key-value pairs mutably in arbitrary order. 204 /// 205 /// The iterator element type is &'a mut (K, V). 206 /// 207 /// # Examples 208 /// 209 /// ``` 210 /// use tokio_stream::{StreamMap, pending}; 211 /// 212 /// let mut map = StreamMap::new(); 213 /// 214 /// map.insert("a", pending::<i32>()); 215 /// map.insert("b", pending()); 216 /// map.insert("c", pending()); 217 /// 218 /// for (key, stream) in map.iter_mut() { 219 /// println!("({}, {:?})", key, stream); 220 /// } 221 /// ``` iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)>222 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> { 223 self.entries.iter_mut() 224 } 225 226 /// Creates an empty `StreamMap`. 227 /// 228 /// The stream map is initially created with a capacity of `0`, so it will 229 /// not allocate until it is first inserted into. 230 /// 231 /// # Examples 232 /// 233 /// ``` 234 /// use tokio_stream::{StreamMap, Pending}; 235 /// 236 /// let map: StreamMap<&str, Pending<()>> = StreamMap::new(); 237 /// ``` new() -> StreamMap<K, V>238 pub fn new() -> StreamMap<K, V> { 239 StreamMap { entries: vec![] } 240 } 241 242 /// Creates an empty `StreamMap` with the specified capacity. 243 /// 244 /// The stream map will be able to hold at least `capacity` elements without 245 /// reallocating. If `capacity` is 0, the stream map will not allocate. 246 /// 247 /// # Examples 248 /// 249 /// ``` 250 /// use tokio_stream::{StreamMap, Pending}; 251 /// 252 /// let map: StreamMap<&str, Pending<()>> = StreamMap::with_capacity(10); 253 /// ``` with_capacity(capacity: usize) -> StreamMap<K, V>254 pub fn with_capacity(capacity: usize) -> StreamMap<K, V> { 255 StreamMap { 256 entries: Vec::with_capacity(capacity), 257 } 258 } 259 260 /// Returns an iterator visiting all keys in arbitrary order. 261 /// 262 /// The iterator element type is &'a K. 263 /// 264 /// # Examples 265 /// 266 /// ``` 267 /// use tokio_stream::{StreamMap, pending}; 268 /// 269 /// let mut map = StreamMap::new(); 270 /// 271 /// map.insert("a", pending::<i32>()); 272 /// map.insert("b", pending()); 273 /// map.insert("c", pending()); 274 /// 275 /// for key in map.keys() { 276 /// println!("{}", key); 277 /// } 278 /// ``` keys(&self) -> impl Iterator<Item = &K>279 pub fn keys(&self) -> impl Iterator<Item = &K> { 280 self.iter().map(|(k, _)| k) 281 } 282 283 /// An iterator visiting all values in arbitrary order. 284 /// 285 /// The iterator element type is &'a V. 286 /// 287 /// # Examples 288 /// 289 /// ``` 290 /// use tokio_stream::{StreamMap, pending}; 291 /// 292 /// let mut map = StreamMap::new(); 293 /// 294 /// map.insert("a", pending::<i32>()); 295 /// map.insert("b", pending()); 296 /// map.insert("c", pending()); 297 /// 298 /// for stream in map.values() { 299 /// println!("{:?}", stream); 300 /// } 301 /// ``` values(&self) -> impl Iterator<Item = &V>302 pub fn values(&self) -> impl Iterator<Item = &V> { 303 self.iter().map(|(_, v)| v) 304 } 305 306 /// An iterator visiting all values mutably in arbitrary order. 307 /// 308 /// The iterator element type is &'a mut V. 309 /// 310 /// # Examples 311 /// 312 /// ``` 313 /// use tokio_stream::{StreamMap, pending}; 314 /// 315 /// let mut map = StreamMap::new(); 316 /// 317 /// map.insert("a", pending::<i32>()); 318 /// map.insert("b", pending()); 319 /// map.insert("c", pending()); 320 /// 321 /// for stream in map.values_mut() { 322 /// println!("{:?}", stream); 323 /// } 324 /// ``` values_mut(&mut self) -> impl Iterator<Item = &mut V>325 pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> { 326 self.iter_mut().map(|(_, v)| v) 327 } 328 329 /// Returns the number of streams the map can hold without reallocating. 330 /// 331 /// This number is a lower bound; the `StreamMap` might be able to hold 332 /// more, but is guaranteed to be able to hold at least this many. 333 /// 334 /// # Examples 335 /// 336 /// ``` 337 /// use tokio_stream::{StreamMap, Pending}; 338 /// 339 /// let map: StreamMap<i32, Pending<()>> = StreamMap::with_capacity(100); 340 /// assert!(map.capacity() >= 100); 341 /// ``` capacity(&self) -> usize342 pub fn capacity(&self) -> usize { 343 self.entries.capacity() 344 } 345 346 /// Returns the number of streams in the map. 347 /// 348 /// # Examples 349 /// 350 /// ``` 351 /// use tokio_stream::{StreamMap, pending}; 352 /// 353 /// let mut a = StreamMap::new(); 354 /// assert_eq!(a.len(), 0); 355 /// a.insert(1, pending::<i32>()); 356 /// assert_eq!(a.len(), 1); 357 /// ``` len(&self) -> usize358 pub fn len(&self) -> usize { 359 self.entries.len() 360 } 361 362 /// Returns `true` if the map contains no elements. 363 /// 364 /// # Examples 365 /// 366 /// ``` 367 /// use tokio_stream::{StreamMap, pending}; 368 /// 369 /// let mut a = StreamMap::new(); 370 /// assert!(a.is_empty()); 371 /// a.insert(1, pending::<i32>()); 372 /// assert!(!a.is_empty()); 373 /// ``` is_empty(&self) -> bool374 pub fn is_empty(&self) -> bool { 375 self.entries.is_empty() 376 } 377 378 /// Clears the map, removing all key-stream pairs. Keeps the allocated 379 /// memory for reuse. 380 /// 381 /// # Examples 382 /// 383 /// ``` 384 /// use tokio_stream::{StreamMap, pending}; 385 /// 386 /// let mut a = StreamMap::new(); 387 /// a.insert(1, pending::<i32>()); 388 /// a.clear(); 389 /// assert!(a.is_empty()); 390 /// ``` clear(&mut self)391 pub fn clear(&mut self) { 392 self.entries.clear(); 393 } 394 395 /// Insert a key-stream pair into the map. 396 /// 397 /// If the map did not have this key present, `None` is returned. 398 /// 399 /// If the map did have this key present, the new `stream` replaces the old 400 /// one and the old stream is returned. 401 /// 402 /// # Examples 403 /// 404 /// ``` 405 /// use tokio_stream::{StreamMap, pending}; 406 /// 407 /// let mut map = StreamMap::new(); 408 /// 409 /// assert!(map.insert(37, pending::<i32>()).is_none()); 410 /// assert!(!map.is_empty()); 411 /// 412 /// map.insert(37, pending()); 413 /// assert!(map.insert(37, pending()).is_some()); 414 /// ``` insert(&mut self, k: K, stream: V) -> Option<V> where K: Hash + Eq,415 pub fn insert(&mut self, k: K, stream: V) -> Option<V> 416 where 417 K: Hash + Eq, 418 { 419 let ret = self.remove(&k); 420 self.entries.push((k, stream)); 421 422 ret 423 } 424 425 /// Removes a key from the map, returning the stream at the key if the key was previously in the map. 426 /// 427 /// The key may be any borrowed form of the map's key type, but `Hash` and 428 /// `Eq` on the borrowed form must match those for the key type. 429 /// 430 /// # Examples 431 /// 432 /// ``` 433 /// use tokio_stream::{StreamMap, pending}; 434 /// 435 /// let mut map = StreamMap::new(); 436 /// map.insert(1, pending::<i32>()); 437 /// assert!(map.remove(&1).is_some()); 438 /// assert!(map.remove(&1).is_none()); 439 /// ``` remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> where K: Borrow<Q>, Q: Hash + Eq,440 pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> 441 where 442 K: Borrow<Q>, 443 Q: Hash + Eq, 444 { 445 for i in 0..self.entries.len() { 446 if self.entries[i].0.borrow() == k { 447 return Some(self.entries.swap_remove(i).1); 448 } 449 } 450 451 None 452 } 453 454 /// Returns `true` if the map contains a stream for the specified key. 455 /// 456 /// The key may be any borrowed form of the map's key type, but `Hash` and 457 /// `Eq` on the borrowed form must match those for the key type. 458 /// 459 /// # Examples 460 /// 461 /// ``` 462 /// use tokio_stream::{StreamMap, pending}; 463 /// 464 /// let mut map = StreamMap::new(); 465 /// map.insert(1, pending::<i32>()); 466 /// assert_eq!(map.contains_key(&1), true); 467 /// assert_eq!(map.contains_key(&2), false); 468 /// ``` contains_key<Q: ?Sized>(&self, k: &Q) -> bool where K: Borrow<Q>, Q: Hash + Eq,469 pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool 470 where 471 K: Borrow<Q>, 472 Q: Hash + Eq, 473 { 474 for i in 0..self.entries.len() { 475 if self.entries[i].0.borrow() == k { 476 return true; 477 } 478 } 479 480 false 481 } 482 } 483 484 impl<K, V> StreamMap<K, V> 485 where 486 K: Unpin, 487 V: Stream + Unpin, 488 { 489 /// Polls the next value, includes the vec entry index poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>>490 fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> { 491 use Poll::*; 492 493 let start = self::rand::thread_rng_n(self.entries.len() as u32) as usize; 494 let mut idx = start; 495 496 for _ in 0..self.entries.len() { 497 let (_, stream) = &mut self.entries[idx]; 498 499 match Pin::new(stream).poll_next(cx) { 500 Ready(Some(val)) => return Ready(Some((idx, val))), 501 Ready(None) => { 502 // Remove the entry 503 self.entries.swap_remove(idx); 504 505 // Check if this was the last entry, if so the cursor needs 506 // to wrap 507 if idx == self.entries.len() { 508 idx = 0; 509 } else if idx < start && start <= self.entries.len() { 510 // The stream being swapped into the current index has 511 // already been polled, so skip it. 512 idx = idx.wrapping_add(1) % self.entries.len(); 513 } 514 } 515 Pending => { 516 idx = idx.wrapping_add(1) % self.entries.len(); 517 } 518 } 519 } 520 521 // If the map is empty, then the stream is complete. 522 if self.entries.is_empty() { 523 Ready(None) 524 } else { 525 Pending 526 } 527 } 528 } 529 530 impl<K, V> Default for StreamMap<K, V> { default() -> Self531 fn default() -> Self { 532 Self::new() 533 } 534 } 535 536 impl<K, V> Stream for StreamMap<K, V> 537 where 538 K: Clone + Unpin, 539 V: Stream + Unpin, 540 { 541 type Item = (K, V::Item); 542 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>543 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 544 if let Some((idx, val)) = ready!(self.poll_next_entry(cx)) { 545 let key = self.entries[idx].0.clone(); 546 Poll::Ready(Some((key, val))) 547 } else { 548 Poll::Ready(None) 549 } 550 } 551 size_hint(&self) -> (usize, Option<usize>)552 fn size_hint(&self) -> (usize, Option<usize>) { 553 let mut ret = (0, Some(0)); 554 555 for (_, stream) in &self.entries { 556 let hint = stream.size_hint(); 557 558 ret.0 += hint.0; 559 560 match (ret.1, hint.1) { 561 (Some(a), Some(b)) => ret.1 = Some(a + b), 562 (Some(_), None) => ret.1 = None, 563 _ => {} 564 } 565 } 566 567 ret 568 } 569 } 570 571 impl<K, V> std::iter::FromIterator<(K, V)> for StreamMap<K, V> 572 where 573 K: Hash + Eq, 574 { from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self575 fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self { 576 let iterator = iter.into_iter(); 577 let (lower_bound, _) = iterator.size_hint(); 578 let mut stream_map = Self::with_capacity(lower_bound); 579 580 for (key, value) in iterator { 581 stream_map.insert(key, value); 582 } 583 584 stream_map 585 } 586 } 587 588 mod rand { 589 use std::cell::Cell; 590 591 mod loom { 592 #[cfg(not(loom))] 593 pub(crate) mod rand { 594 use std::collections::hash_map::RandomState; 595 use std::hash::{BuildHasher, Hash, Hasher}; 596 use std::sync::atomic::AtomicU32; 597 use std::sync::atomic::Ordering::Relaxed; 598 599 static COUNTER: AtomicU32 = AtomicU32::new(1); 600 seed() -> u64601 pub(crate) fn seed() -> u64 { 602 let rand_state = RandomState::new(); 603 604 let mut hasher = rand_state.build_hasher(); 605 606 // Hash some unique-ish data to generate some new state 607 COUNTER.fetch_add(1, Relaxed).hash(&mut hasher); 608 609 // Get the seed 610 hasher.finish() 611 } 612 } 613 614 #[cfg(loom)] 615 pub(crate) mod rand { seed() -> u64616 pub(crate) fn seed() -> u64 { 617 1 618 } 619 } 620 } 621 622 /// Fast random number generate 623 /// 624 /// Implement xorshift64+: 2 32-bit xorshift sequences added together. 625 /// Shift triplet `[17,7,16]` was calculated as indicated in Marsaglia's 626 /// Xorshift paper: <https://www.jstatsoft.org/article/view/v008i14/xorshift.pdf> 627 /// This generator passes the SmallCrush suite, part of TestU01 framework: 628 /// <http://simul.iro.umontreal.ca/testu01/tu01.html> 629 #[derive(Debug)] 630 pub(crate) struct FastRand { 631 one: Cell<u32>, 632 two: Cell<u32>, 633 } 634 635 impl FastRand { 636 /// Initialize a new, thread-local, fast random number generator. new(seed: u64) -> FastRand637 pub(crate) fn new(seed: u64) -> FastRand { 638 let one = (seed >> 32) as u32; 639 let mut two = seed as u32; 640 641 if two == 0 { 642 // This value cannot be zero 643 two = 1; 644 } 645 646 FastRand { 647 one: Cell::new(one), 648 two: Cell::new(two), 649 } 650 } 651 fastrand_n(&self, n: u32) -> u32652 pub(crate) fn fastrand_n(&self, n: u32) -> u32 { 653 // This is similar to fastrand() % n, but faster. 654 // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ 655 let mul = (self.fastrand() as u64).wrapping_mul(n as u64); 656 (mul >> 32) as u32 657 } 658 fastrand(&self) -> u32659 fn fastrand(&self) -> u32 { 660 let mut s1 = self.one.get(); 661 let s0 = self.two.get(); 662 663 s1 ^= s1 << 17; 664 s1 = s1 ^ s0 ^ s1 >> 7 ^ s0 >> 16; 665 666 self.one.set(s0); 667 self.two.set(s1); 668 669 s0.wrapping_add(s1) 670 } 671 } 672 673 // Used by `StreamMap` thread_rng_n(n: u32) -> u32674 pub(crate) fn thread_rng_n(n: u32) -> u32 { 675 thread_local! { 676 static THREAD_RNG: FastRand = FastRand::new(loom::rand::seed()); 677 } 678 679 THREAD_RNG.with(|rng| rng.fastrand_n(n)) 680 } 681 } 682