Lines Matching +full:merge +full:- +full:stream
1 use crate::Stream;
8 /// Combine many streams into one, indexing each source stream with a unique
11 /// `StreamMap` is similar to [`StreamExt::merge`] in that it combines source
12 /// streams into a single merged stream that yields values in the order that
18 /// * Merge an arbitrary number of streams.
19 /// * Track which source stream the value was received from.
24 /// included with the value when a source stream yields a value. The key is also
25 /// used to remove the stream from the `StreamMap` before the stream has
31 /// keys must be `Unpin`. In order to insert a `!Unpin` stream into a
32 /// `StreamMap`, use [`pin!`] to pin the stream to the stack or [`Box::pin`] to
33 /// pin the stream in the heap.
49 /// To do this, use [`StreamNotifyClose`] as a wrapper to your stream.
50 /// It will return None when the stream is closed.
52 /// [`StreamExt::merge`]: crate::StreamExt::merge
63 /// use tokio_stream::{StreamExt, StreamMap, Stream};
72 /// // Convert the channels to a `Stream`.
73 /// let rx1 = Box::pin(async_stream::stream! {
77 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
79 /// let rx2 = Box::pin(async_stream::stream! {
83 /// }) as Pin<Box<dyn Stream<Item = usize> + Send>>;
114 /// // Remove the stream to prevent reading the next value
120 /// This example models a read-only client to a chat system with channels. The
128 /// use tokio_stream::{Stream, StreamExt, StreamMap};
135 /// fn commands() -> impl Stream<Item = Command> {
140 /// // Join a channel, returns a stream of messages received on the channel.
141 /// fn join(channel: &str) -> impl Stream<Item = String> + Unpin {
160 /// // stream map
174 /// // Both the `commands` stream and the `channels` stream are
190 /// let stream = StreamNotifyClose::new(tokio_stream::iter(vec![0, 1]));
192 /// map.insert(0, stream);
196 /// Some(val) => println!("got {val:?} from stream {key:?}"),
197 /// None => println!("stream {key:?} closed"),
210 /// An iterator visiting all key-value pairs in arbitrary order.
225 /// for (key, stream) in map.iter() {
226 /// println!("({}, {:?})", key, stream);
229 pub fn iter(&self) -> impl Iterator<Item = &(K, V)> { in iter()
233 /// An iterator visiting all key-value pairs mutably in arbitrary order.
248 /// for (key, stream) in map.iter_mut() {
249 /// println!("({}, {:?})", key, stream);
252 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut (K, V)> { in iter_mut()
258 /// The stream map is initially created with a capacity of `0`, so it will
268 pub fn new() -> StreamMap<K, V> { in new()
274 /// The stream map will be able to hold at least `capacity` elements without
275 /// reallocating. If `capacity` is 0, the stream map will not allocate.
284 pub fn with_capacity(capacity: usize) -> StreamMap<K, V> { in with_capacity()
309 pub fn keys(&self) -> impl Iterator<Item = &K> { in keys()
328 /// for stream in map.values() {
329 /// println!("{:?}", stream);
332 pub fn values(&self) -> impl Iterator<Item = &V> { in values()
351 /// for stream in map.values_mut() {
352 /// println!("{:?}", stream);
355 pub fn values_mut(&mut self) -> impl Iterator<Item = &mut V> { in values_mut()
372 pub fn capacity(&self) -> usize { in capacity()
388 pub fn len(&self) -> usize { in len()
404 pub fn is_empty(&self) -> bool { in is_empty()
408 /// Clears the map, removing all key-stream pairs. Keeps the allocated
425 /// Insert a key-stream pair into the map.
429 /// If the map did have this key present, the new `stream` replaces the old
430 /// one and the old stream is returned.
445 pub fn insert(&mut self, k: K, stream: V) -> Option<V> in insert()
450 self.entries.push((k, stream)); in insert()
455 …/// Removes a key from the map, returning the stream at the key if the key was previously in the m…
470 pub fn remove<Q: ?Sized>(&mut self, k: &Q) -> Option<V> in remove()
484 /// Returns `true` if the map contains a stream for the specified key.
499 pub fn contains_key<Q: ?Sized>(&self, k: &Q) -> bool in contains_key()
517 V: Stream + Unpin,
520 fn poll_next_entry(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, V::Item)>> { in poll_next_entry()
527 let (_, stream) = &mut self.entries[idx]; in poll_next_entry()
529 match Pin::new(stream).poll_next(cx) { in poll_next_entry()
540 // The stream being swapped into the current index has in poll_next_entry()
551 // If the map is empty, then the stream is complete. in poll_next_entry()
561 fn default() -> Self { in default()
566 impl<K, V> Stream for StreamMap<K, V>
569 V: Stream + Unpin,
573 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { in poll_next()
582 fn size_hint(&self) -> (usize, Option<usize>) { in size_hint()
585 for (_, stream) in &self.entries { in size_hint()
586 let hint = stream.size_hint(); in size_hint()
605 fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self { in from_iter()
640 pub(crate) fn seed() -> u64 { in seed()
645 // Hash some unique-ish data to generate some new state in seed()
655 pub(crate) fn seed() -> u64 { in seed()
663 /// Implement xorshift64+: 2 32-bit xorshift sequences added together.
675 /// Initialize a new, thread-local, fast random number generator.
676 pub(crate) fn new(seed: u64) -> FastRand { in new()
691 pub(crate) fn fastrand_n(&self, n: u32) -> u32 { in fastrand_n()
693 // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ in fastrand_n()
698 fn fastrand(&self) -> u32 { in fastrand()
713 pub(crate) fn thread_rng_n(n: u32) -> u32 { in thread_rng_n()