• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Streams
2 //!
3 //! This module contains a number of functions for working with `Stream`s,
4 //! including the `StreamExt` trait which adds methods to `Stream` types.
5 
6 use crate::future::{assert_future, Either};
7 use crate::stream::assert_stream;
8 #[cfg(feature = "alloc")]
9 use alloc::boxed::Box;
10 #[cfg(feature = "alloc")]
11 use alloc::vec::Vec;
12 use core::pin::Pin;
13 #[cfg(feature = "sink")]
14 use futures_core::stream::TryStream;
15 #[cfg(feature = "alloc")]
16 use futures_core::stream::{BoxStream, LocalBoxStream};
17 use futures_core::{
18     future::Future,
19     stream::{FusedStream, Stream},
20     task::{Context, Poll},
21 };
22 #[cfg(feature = "sink")]
23 use futures_sink::Sink;
24 
25 use crate::fns::{inspect_fn, InspectFn};
26 
27 mod chain;
28 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
29 pub use self::chain::Chain;
30 
31 mod collect;
32 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
33 pub use self::collect::Collect;
34 
35 mod unzip;
36 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
37 pub use self::unzip::Unzip;
38 
39 mod concat;
40 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
41 pub use self::concat::Concat;
42 
43 mod cycle;
44 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
45 pub use self::cycle::Cycle;
46 
47 mod enumerate;
48 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
49 pub use self::enumerate::Enumerate;
50 
51 mod filter;
52 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
53 pub use self::filter::Filter;
54 
55 mod filter_map;
56 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
57 pub use self::filter_map::FilterMap;
58 
59 mod flatten;
60 
61 delegate_all!(
62     /// Stream for the [`flatten`](StreamExt::flatten) method.
63     Flatten<St>(
64         flatten::Flatten<St, St::Item>
65     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| flatten::Flatten::new(x)]
66     where St: Stream
67 );
68 
69 mod fold;
70 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
71 pub use self::fold::Fold;
72 
73 #[cfg(feature = "sink")]
74 mod forward;
75 
76 #[cfg(feature = "sink")]
77 delegate_all!(
78     /// Future for the [`forward`](super::StreamExt::forward) method.
79     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
80     Forward<St, Si>(
81         forward::Forward<St, Si, St::Ok>
82     ): Debug + Future + FusedFuture + New[|x: St, y: Si| forward::Forward::new(x, y)]
83     where St: TryStream
84 );
85 
86 mod for_each;
87 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
88 pub use self::for_each::ForEach;
89 
90 mod fuse;
91 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
92 pub use self::fuse::Fuse;
93 
94 mod into_future;
95 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
96 pub use self::into_future::StreamFuture;
97 
98 delegate_all!(
99     /// Stream for the [`inspect`](StreamExt::inspect) method.
100     Inspect<St, F>(
101         map::Map<St, InspectFn<F>>
102     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St, f: F| map::Map::new(x, inspect_fn(f))]
103 );
104 
105 mod map;
106 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
107 pub use self::map::Map;
108 
109 delegate_all!(
110     /// Stream for the [`flat_map`](StreamExt::flat_map) method.
111     FlatMap<St, U, F>(
112         flatten::Flatten<Map<St, F>, U>
113     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| flatten::Flatten::new(Map::new(x, f))]
114 );
115 
116 mod next;
117 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
118 pub use self::next::Next;
119 
120 mod select_next_some;
121 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
122 pub use self::select_next_some::SelectNextSome;
123 
124 mod peek;
125 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
126 pub use self::peek::{Peek, Peekable};
127 
128 mod skip;
129 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
130 pub use self::skip::Skip;
131 
132 mod skip_while;
133 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
134 pub use self::skip_while::SkipWhile;
135 
136 mod take;
137 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
138 pub use self::take::Take;
139 
140 mod take_while;
141 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
142 pub use self::take_while::TakeWhile;
143 
144 mod take_until;
145 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
146 pub use self::take_until::TakeUntil;
147 
148 mod then;
149 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
150 pub use self::then::Then;
151 
152 mod zip;
153 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
154 pub use self::zip::Zip;
155 
156 #[cfg(feature = "alloc")]
157 mod chunks;
158 #[cfg(feature = "alloc")]
159 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
160 pub use self::chunks::Chunks;
161 
162 #[cfg(feature = "alloc")]
163 mod ready_chunks;
164 #[cfg(feature = "alloc")]
165 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
166 pub use self::ready_chunks::ReadyChunks;
167 
168 mod scan;
169 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
170 pub use self::scan::Scan;
171 
172 cfg_target_has_atomic! {
173     #[cfg(feature = "alloc")]
174     mod buffer_unordered;
175     #[cfg(feature = "alloc")]
176     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
177     pub use self::buffer_unordered::BufferUnordered;
178 
179     #[cfg(feature = "alloc")]
180     mod buffered;
181     #[cfg(feature = "alloc")]
182     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
183     pub use self::buffered::Buffered;
184 
185     #[cfg(feature = "alloc")]
186     mod for_each_concurrent;
187     #[cfg(feature = "alloc")]
188     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
189     pub use self::for_each_concurrent::ForEachConcurrent;
190 
191     #[cfg(feature = "sink")]
192     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
193     #[cfg(feature = "alloc")]
194     mod split;
195     #[cfg(feature = "sink")]
196     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
197     #[cfg(feature = "alloc")]
198     #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
199     pub use self::split::{SplitStream, SplitSink, ReuniteError};
200 }
201 
202 #[cfg(feature = "std")]
203 mod catch_unwind;
204 #[cfg(feature = "std")]
205 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
206 pub use self::catch_unwind::CatchUnwind;
207 
208 impl<T: ?Sized> StreamExt for T where T: Stream {}
209 
210 /// An extension trait for `Stream`s that provides a variety of convenient
211 /// combinator functions.
212 pub trait StreamExt: Stream {
213     /// Creates a future that resolves to the next item in the stream.
214     ///
215     /// Note that because `next` doesn't take ownership over the stream,
216     /// the [`Stream`] type must be [`Unpin`]. If you want to use `next` with a
217     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
218     /// be done by boxing the stream using [`Box::pin`] or
219     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
220     /// crate.
221     ///
222     /// # Examples
223     ///
224     /// ```
225     /// # futures::executor::block_on(async {
226     /// use futures::stream::{self, StreamExt};
227     ///
228     /// let mut stream = stream::iter(1..=3);
229     ///
230     /// assert_eq!(stream.next().await, Some(1));
231     /// assert_eq!(stream.next().await, Some(2));
232     /// assert_eq!(stream.next().await, Some(3));
233     /// assert_eq!(stream.next().await, None);
234     /// # });
235     /// ```
next(&mut self) -> Next<'_, Self> where Self: Unpin,236     fn next(&mut self) -> Next<'_, Self>
237     where
238         Self: Unpin,
239     {
240         assert_future::<Option<Self::Item>, _>(Next::new(self))
241     }
242 
243     /// Converts this stream into a future of `(next_item, tail_of_stream)`.
244     /// If the stream terminates, then the next item is [`None`].
245     ///
246     /// The returned future can be used to compose streams and futures together
247     /// by placing everything into the "world of futures".
248     ///
249     /// Note that because `into_future` moves the stream, the [`Stream`] type
250     /// must be [`Unpin`]. If you want to use `into_future` with a
251     /// [`!Unpin`](Unpin) stream, you'll first have to pin the stream. This can
252     /// be done by boxing the stream using [`Box::pin`] or
253     /// pinning it to the stack using the `pin_mut!` macro from the `pin_utils`
254     /// crate.
255     ///
256     /// # Examples
257     ///
258     /// ```
259     /// # futures::executor::block_on(async {
260     /// use futures::stream::{self, StreamExt};
261     ///
262     /// let stream = stream::iter(1..=3);
263     ///
264     /// let (item, stream) = stream.into_future().await;
265     /// assert_eq!(Some(1), item);
266     ///
267     /// let (item, stream) = stream.into_future().await;
268     /// assert_eq!(Some(2), item);
269     /// # });
270     /// ```
into_future(self) -> StreamFuture<Self> where Self: Sized + Unpin,271     fn into_future(self) -> StreamFuture<Self>
272     where
273         Self: Sized + Unpin,
274     {
275         assert_future::<(Option<Self::Item>, Self), _>(StreamFuture::new(self))
276     }
277 
278     /// Maps this stream's items to a different type, returning a new stream of
279     /// the resulting type.
280     ///
281     /// The provided closure is executed over all elements of this stream as
282     /// they are made available. It is executed inline with calls to
283     /// [`poll_next`](Stream::poll_next).
284     ///
285     /// Note that this function consumes the stream passed into it and returns a
286     /// wrapped version of it, similar to the existing `map` methods in the
287     /// standard library.
288     ///
289     /// # Examples
290     ///
291     /// ```
292     /// # futures::executor::block_on(async {
293     /// use futures::stream::{self, StreamExt};
294     ///
295     /// let stream = stream::iter(1..=3);
296     /// let stream = stream.map(|x| x + 3);
297     ///
298     /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
299     /// # });
300     /// ```
map<T, F>(self, f: F) -> Map<Self, F> where F: FnMut(Self::Item) -> T, Self: Sized,301     fn map<T, F>(self, f: F) -> Map<Self, F>
302     where
303         F: FnMut(Self::Item) -> T,
304         Self: Sized,
305     {
306         assert_stream::<T, _>(Map::new(self, f))
307     }
308 
309     /// Creates a stream which gives the current iteration count as well as
310     /// the next value.
311     ///
312     /// The stream returned yields pairs `(i, val)`, where `i` is the
313     /// current index of iteration and `val` is the value returned by the
314     /// stream.
315     ///
316     /// `enumerate()` keeps its count as a [`usize`]. If you want to count by a
317     /// different sized integer, the [`zip`](StreamExt::zip) function provides similar
318     /// functionality.
319     ///
320     /// # Overflow Behavior
321     ///
322     /// The method does no guarding against overflows, so enumerating more than
323     /// [`prim@usize::max_value()`] elements either produces the wrong result or panics. If
324     /// debug assertions are enabled, a panic is guaranteed.
325     ///
326     /// # Panics
327     ///
328     /// The returned stream might panic if the to-be-returned index would
329     /// overflow a [`usize`].
330     ///
331     /// # Examples
332     ///
333     /// ```
334     /// # futures::executor::block_on(async {
335     /// use futures::stream::{self, StreamExt};
336     ///
337     /// let stream = stream::iter(vec!['a', 'b', 'c']);
338     ///
339     /// let mut stream = stream.enumerate();
340     ///
341     /// assert_eq!(stream.next().await, Some((0, 'a')));
342     /// assert_eq!(stream.next().await, Some((1, 'b')));
343     /// assert_eq!(stream.next().await, Some((2, 'c')));
344     /// assert_eq!(stream.next().await, None);
345     /// # });
346     /// ```
enumerate(self) -> Enumerate<Self> where Self: Sized,347     fn enumerate(self) -> Enumerate<Self>
348     where
349         Self: Sized,
350     {
351         assert_stream::<(usize, Self::Item), _>(Enumerate::new(self))
352     }
353 
354     /// Filters the values produced by this stream according to the provided
355     /// asynchronous predicate.
356     ///
357     /// As values of this stream are made available, the provided predicate `f`
358     /// will be run against them. If the predicate returns a `Future` which
359     /// resolves to `true`, then the stream will yield the value, but if the
360     /// predicate returns a `Future` which resolves to `false`, then the value
361     /// will be discarded and the next value will be produced.
362     ///
363     /// Note that this function consumes the stream passed into it and returns a
364     /// wrapped version of it, similar to the existing `filter` methods in the
365     /// standard library.
366     ///
367     /// # Examples
368     ///
369     /// ```
370     /// # futures::executor::block_on(async {
371     /// use futures::future;
372     /// use futures::stream::{self, StreamExt};
373     ///
374     /// let stream = stream::iter(1..=10);
375     /// let evens = stream.filter(|x| future::ready(x % 2 == 0));
376     ///
377     /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await);
378     /// # });
379     /// ```
filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,380     fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
381     where
382         F: FnMut(&Self::Item) -> Fut,
383         Fut: Future<Output = bool>,
384         Self: Sized,
385     {
386         assert_stream::<Self::Item, _>(Filter::new(self, f))
387     }
388 
389     /// Filters the values produced by this stream while simultaneously mapping
390     /// them to a different type according to the provided asynchronous closure.
391     ///
392     /// As values of this stream are made available, the provided function will
393     /// be run on them. If the future returned by the predicate `f` resolves to
394     /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
395     /// it resolves to [`None`] then the next value will be produced.
396     ///
397     /// Note that this function consumes the stream passed into it and returns a
398     /// wrapped version of it, similar to the existing `filter_map` methods in
399     /// the standard library.
400     ///
401     /// # Examples
402     /// ```
403     /// # futures::executor::block_on(async {
404     /// use futures::stream::{self, StreamExt};
405     ///
406     /// let stream = stream::iter(1..=10);
407     /// let evens = stream.filter_map(|x| async move {
408     ///     if x % 2 == 0 { Some(x + 1) } else { None }
409     /// });
410     ///
411     /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await);
412     /// # });
413     /// ```
filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = Option<T>>, Self: Sized,414     fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
415     where
416         F: FnMut(Self::Item) -> Fut,
417         Fut: Future<Output = Option<T>>,
418         Self: Sized,
419     {
420         assert_stream::<T, _>(FilterMap::new(self, f))
421     }
422 
423     /// Computes from this stream's items new items of a different type using
424     /// an asynchronous closure.
425     ///
426     /// The provided closure `f` will be called with an `Item` once a value is
427     /// ready, it returns a future which will then be run to completion
428     /// to produce the next value on this stream.
429     ///
430     /// Note that this function consumes the stream passed into it and returns a
431     /// wrapped version of it.
432     ///
433     /// # Examples
434     ///
435     /// ```
436     /// # futures::executor::block_on(async {
437     /// use futures::stream::{self, StreamExt};
438     ///
439     /// let stream = stream::iter(1..=3);
440     /// let stream = stream.then(|x| async move { x + 3 });
441     ///
442     /// assert_eq!(vec![4, 5, 6], stream.collect::<Vec<_>>().await);
443     /// # });
444     /// ```
then<Fut, F>(self, f: F) -> Then<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future, Self: Sized,445     fn then<Fut, F>(self, f: F) -> Then<Self, Fut, F>
446     where
447         F: FnMut(Self::Item) -> Fut,
448         Fut: Future,
449         Self: Sized,
450     {
451         assert_stream::<Fut::Output, _>(Then::new(self, f))
452     }
453 
454     /// Transforms a stream into a collection, returning a
455     /// future representing the result of that computation.
456     ///
457     /// The returned future will be resolved when the stream terminates.
458     ///
459     /// # Examples
460     ///
461     /// ```
462     /// # futures::executor::block_on(async {
463     /// use futures::channel::mpsc;
464     /// use futures::stream::StreamExt;
465     /// use std::thread;
466     ///
467     /// let (tx, rx) = mpsc::unbounded();
468     ///
469     /// thread::spawn(move || {
470     ///     for i in 1..=5 {
471     ///         tx.unbounded_send(i).unwrap();
472     ///     }
473     /// });
474     ///
475     /// let output = rx.collect::<Vec<i32>>().await;
476     /// assert_eq!(output, vec![1, 2, 3, 4, 5]);
477     /// # });
478     /// ```
collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C> where Self: Sized,479     fn collect<C: Default + Extend<Self::Item>>(self) -> Collect<Self, C>
480     where
481         Self: Sized,
482     {
483         assert_future::<C, _>(Collect::new(self))
484     }
485 
486     /// Converts a stream of pairs into a future, which
487     /// resolves to pair of containers.
488     ///
489     /// `unzip()` produces a future, which resolves to two
490     /// collections: one from the left elements of the pairs,
491     /// and one from the right elements.
492     ///
493     /// The returned future will be resolved when the stream terminates.
494     ///
495     /// # Examples
496     ///
497     /// ```
498     /// # futures::executor::block_on(async {
499     /// use futures::channel::mpsc;
500     /// use futures::stream::StreamExt;
501     /// use std::thread;
502     ///
503     /// let (tx, rx) = mpsc::unbounded();
504     ///
505     /// thread::spawn(move || {
506     ///     tx.unbounded_send((1, 2)).unwrap();
507     ///     tx.unbounded_send((3, 4)).unwrap();
508     ///     tx.unbounded_send((5, 6)).unwrap();
509     /// });
510     ///
511     /// let (o1, o2): (Vec<_>, Vec<_>) = rx.unzip().await;
512     /// assert_eq!(o1, vec![1, 3, 5]);
513     /// assert_eq!(o2, vec![2, 4, 6]);
514     /// # });
515     /// ```
unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB> where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Sized + Stream<Item = (A, B)>,516     fn unzip<A, B, FromA, FromB>(self) -> Unzip<Self, FromA, FromB>
517     where
518         FromA: Default + Extend<A>,
519         FromB: Default + Extend<B>,
520         Self: Sized + Stream<Item = (A, B)>,
521     {
522         assert_future::<(FromA, FromB), _>(Unzip::new(self))
523     }
524 
525     /// Concatenate all items of a stream into a single extendable
526     /// destination, returning a future representing the end result.
527     ///
528     /// This combinator will extend the first item with the contents
529     /// of all the subsequent results of the stream. If the stream is
530     /// empty, the default value will be returned.
531     ///
532     /// Works with all collections that implement the
533     /// [`Extend`](std::iter::Extend) trait.
534     ///
535     /// # Examples
536     ///
537     /// ```
538     /// # futures::executor::block_on(async {
539     /// use futures::channel::mpsc;
540     /// use futures::stream::StreamExt;
541     /// use std::thread;
542     ///
543     /// let (tx, rx) = mpsc::unbounded();
544     ///
545     /// thread::spawn(move || {
546     ///     for i in (0..3).rev() {
547     ///         let n = i * 3;
548     ///         tx.unbounded_send(vec![n + 1, n + 2, n + 3]).unwrap();
549     ///     }
550     /// });
551     ///
552     /// let result = rx.concat().await;
553     ///
554     /// assert_eq!(result, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);
555     /// # });
556     /// ```
concat(self) -> Concat<Self> where Self: Sized, Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,557     fn concat(self) -> Concat<Self>
558     where
559         Self: Sized,
560         Self::Item: Extend<<<Self as Stream>::Item as IntoIterator>::Item> + IntoIterator + Default,
561     {
562         assert_future::<Self::Item, _>(Concat::new(self))
563     }
564 
565     /// Repeats a stream endlessly.
566     ///
567     /// The stream never terminates. Note that you likely want to avoid
568     /// usage of `collect` or such on the returned stream as it will exhaust
569     /// available memory as it tries to just fill up all RAM.
570     ///
571     /// # Examples
572     ///
573     /// ```
574     /// # futures::executor::block_on(async {
575     /// use futures::stream::{self, StreamExt};
576     /// let a = [1, 2, 3];
577     /// let mut s = stream::iter(a.iter()).cycle();
578     ///
579     /// assert_eq!(s.next().await, Some(&1));
580     /// assert_eq!(s.next().await, Some(&2));
581     /// assert_eq!(s.next().await, Some(&3));
582     /// assert_eq!(s.next().await, Some(&1));
583     /// assert_eq!(s.next().await, Some(&2));
584     /// assert_eq!(s.next().await, Some(&3));
585     /// assert_eq!(s.next().await, Some(&1));
586     /// # });
587     /// ```
cycle(self) -> Cycle<Self> where Self: Sized + Clone,588     fn cycle(self) -> Cycle<Self>
589     where
590         Self: Sized + Clone,
591     {
592         assert_stream::<Self::Item, _>(Cycle::new(self))
593     }
594 
595     /// Execute an accumulating asynchronous computation over a stream,
596     /// collecting all the values into one final result.
597     ///
598     /// This combinator will accumulate all values returned by this stream
599     /// according to the closure provided. The initial state is also provided to
600     /// this method and then is returned again by each execution of the closure.
601     /// Once the entire stream has been exhausted the returned future will
602     /// resolve to this value.
603     ///
604     /// # Examples
605     ///
606     /// ```
607     /// # futures::executor::block_on(async {
608     /// use futures::stream::{self, StreamExt};
609     ///
610     /// let number_stream = stream::iter(0..6);
611     /// let sum = number_stream.fold(0, |acc, x| async move { acc + x });
612     /// assert_eq!(sum.await, 15);
613     /// # });
614     /// ```
fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F> where F: FnMut(T, Self::Item) -> Fut, Fut: Future<Output = T>, Self: Sized,615     fn fold<T, Fut, F>(self, init: T, f: F) -> Fold<Self, Fut, T, F>
616     where
617         F: FnMut(T, Self::Item) -> Fut,
618         Fut: Future<Output = T>,
619         Self: Sized,
620     {
621         assert_future::<T, _>(Fold::new(self, f, init))
622     }
623 
624     /// Flattens a stream of streams into just one continuous stream.
625     ///
626     /// # Examples
627     ///
628     /// ```
629     /// # futures::executor::block_on(async {
630     /// use futures::channel::mpsc;
631     /// use futures::stream::StreamExt;
632     /// use std::thread;
633     ///
634     /// let (tx1, rx1) = mpsc::unbounded();
635     /// let (tx2, rx2) = mpsc::unbounded();
636     /// let (tx3, rx3) = mpsc::unbounded();
637     ///
638     /// thread::spawn(move || {
639     ///     tx1.unbounded_send(1).unwrap();
640     ///     tx1.unbounded_send(2).unwrap();
641     /// });
642     /// thread::spawn(move || {
643     ///     tx2.unbounded_send(3).unwrap();
644     ///     tx2.unbounded_send(4).unwrap();
645     /// });
646     /// thread::spawn(move || {
647     ///     tx3.unbounded_send(rx1).unwrap();
648     ///     tx3.unbounded_send(rx2).unwrap();
649     /// });
650     ///
651     /// let output = rx3.flatten().collect::<Vec<i32>>().await;
652     /// assert_eq!(output, vec![1, 2, 3, 4]);
653     /// # });
654     /// ```
flatten(self) -> Flatten<Self> where Self::Item: Stream, Self: Sized,655     fn flatten(self) -> Flatten<Self>
656     where
657         Self::Item: Stream,
658         Self: Sized,
659     {
660         assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
661     }
662 
663     /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
664     ///
665     /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead,
666     /// you would have to chain combinators like `.map(f).flatten()` while this
667     /// combinator provides ability to write `.flat_map(f)` instead of chaining.
668     ///
669     /// The provided closure which produce inner streams is executed over all elements
670     /// of stream as last inner stream is terminated and next stream item is available.
671     ///
672     /// Note that this function consumes the stream passed into it and returns a
673     /// wrapped version of it, similar to the existing `flat_map` methods in the
674     /// standard library.
675     ///
676     /// # Examples
677     ///
678     /// ```
679     /// # futures::executor::block_on(async {
680     /// use futures::stream::{self, StreamExt};
681     ///
682     /// let stream = stream::iter(1..=3);
683     /// let stream = stream.flat_map(|x| stream::iter(vec![x + 3; x]));
684     ///
685     /// assert_eq!(vec![4, 5, 5, 6, 6, 6], stream.collect::<Vec<_>>().await);
686     /// # });
687     /// ```
flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where F: FnMut(Self::Item) -> U, U: Stream, Self: Sized,688     fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
689     where
690         F: FnMut(Self::Item) -> U,
691         U: Stream,
692         Self: Sized,
693     {
694         assert_stream::<U::Item, _>(FlatMap::new(self, f))
695     }
696 
697     /// Combinator similar to [`StreamExt::fold`] that holds internal state
698     /// and produces a new stream.
699     ///
700     /// Accepts initial state and closure which will be applied to each element
701     /// of the stream until provided closure returns `None`. Once `None` is
702     /// returned, stream will be terminated.
703     ///
704     /// # Examples
705     ///
706     /// ```
707     /// # futures::executor::block_on(async {
708     /// use futures::future;
709     /// use futures::stream::{self, StreamExt};
710     ///
711     /// let stream = stream::iter(1..=10);
712     ///
713     /// let stream = stream.scan(0, |state, x| {
714     ///     *state += x;
715     ///     future::ready(if *state < 10 { Some(x) } else { None })
716     /// });
717     ///
718     /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
719     /// # });
720     /// ```
scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F> where F: FnMut(&mut S, Self::Item) -> Fut, Fut: Future<Output = Option<B>>, Self: Sized,721     fn scan<S, B, Fut, F>(self, initial_state: S, f: F) -> Scan<Self, S, Fut, F>
722     where
723         F: FnMut(&mut S, Self::Item) -> Fut,
724         Fut: Future<Output = Option<B>>,
725         Self: Sized,
726     {
727         assert_stream::<B, _>(Scan::new(self, initial_state, f))
728     }
729 
730     /// Skip elements on this stream while the provided asynchronous predicate
731     /// resolves to `true`.
732     ///
733     /// This function, like `Iterator::skip_while`, will skip elements on the
734     /// stream until the predicate `f` resolves to `false`. Once one element
735     /// returns `false`, all future elements will be returned from the underlying
736     /// stream.
737     ///
738     /// # Examples
739     ///
740     /// ```
741     /// # futures::executor::block_on(async {
742     /// use futures::future;
743     /// use futures::stream::{self, StreamExt};
744     ///
745     /// let stream = stream::iter(1..=10);
746     ///
747     /// let stream = stream.skip_while(|x| future::ready(*x <= 5));
748     ///
749     /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
750     /// # });
751     /// ```
skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,752     fn skip_while<Fut, F>(self, f: F) -> SkipWhile<Self, Fut, F>
753     where
754         F: FnMut(&Self::Item) -> Fut,
755         Fut: Future<Output = bool>,
756         Self: Sized,
757     {
758         assert_stream::<Self::Item, _>(SkipWhile::new(self, f))
759     }
760 
761     /// Take elements from this stream while the provided asynchronous predicate
762     /// resolves to `true`.
763     ///
764     /// This function, like `Iterator::take_while`, will take elements from the
765     /// stream until the predicate `f` resolves to `false`. Once one element
766     /// returns `false`, it will always return that the stream is done.
767     ///
768     /// # Examples
769     ///
770     /// ```
771     /// # futures::executor::block_on(async {
772     /// use futures::future;
773     /// use futures::stream::{self, StreamExt};
774     ///
775     /// let stream = stream::iter(1..=10);
776     ///
777     /// let stream = stream.take_while(|x| future::ready(*x <= 5));
778     ///
779     /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
780     /// # });
781     /// ```
take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F> where F: FnMut(&Self::Item) -> Fut, Fut: Future<Output = bool>, Self: Sized,782     fn take_while<Fut, F>(self, f: F) -> TakeWhile<Self, Fut, F>
783     where
784         F: FnMut(&Self::Item) -> Fut,
785         Fut: Future<Output = bool>,
786         Self: Sized,
787     {
788         assert_stream::<Self::Item, _>(TakeWhile::new(self, f))
789     }
790 
791     /// Take elements from this stream until the provided future resolves.
792     ///
793     /// This function will take elements from the stream until the provided
794     /// stopping future `fut` resolves. Once the `fut` future becomes ready,
795     /// this stream combinator will always return that the stream is done.
796     ///
797     /// The stopping future may return any type. Once the stream is stopped
798     /// the result of the stopping future may be accessed with `TakeUntil::take_result()`.
799     /// The stream may also be resumed with `TakeUntil::take_future()`.
800     /// See the documentation of [`TakeUntil`] for more information.
801     ///
802     /// # Examples
803     ///
804     /// ```
805     /// # futures::executor::block_on(async {
806     /// use futures::future;
807     /// use futures::stream::{self, StreamExt};
808     /// use futures::task::Poll;
809     ///
810     /// let stream = stream::iter(1..=10);
811     ///
812     /// let mut i = 0;
813     /// let stop_fut = future::poll_fn(|_cx| {
814     ///     i += 1;
815     ///     if i <= 5 {
816     ///         Poll::Pending
817     ///     } else {
818     ///         Poll::Ready(())
819     ///     }
820     /// });
821     ///
822     /// let stream = stream.take_until(stop_fut);
823     ///
824     /// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
825     /// # });
826     /// ```
take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut> where Fut: Future, Self: Sized,827     fn take_until<Fut>(self, fut: Fut) -> TakeUntil<Self, Fut>
828     where
829         Fut: Future,
830         Self: Sized,
831     {
832         assert_stream::<Self::Item, _>(TakeUntil::new(self, fut))
833     }
834 
835     /// Runs this stream to completion, executing the provided asynchronous
836     /// closure for each element on the stream.
837     ///
838     /// The closure provided will be called for each item this stream produces,
839     /// yielding a future. That future will then be executed to completion
840     /// before moving on to the next item.
841     ///
842     /// The returned value is a `Future` where the `Output` type is `()`; it is
843     /// executed entirely for its side effects.
844     ///
845     /// To process each item in the stream and produce another stream instead
846     /// of a single future, use `then` instead.
847     ///
848     /// # Examples
849     ///
850     /// ```
851     /// # futures::executor::block_on(async {
852     /// use futures::future;
853     /// use futures::stream::{self, StreamExt};
854     ///
855     /// let mut x = 0;
856     ///
857     /// {
858     ///     let fut = stream::repeat(1).take(3).for_each(|item| {
859     ///         x += item;
860     ///         future::ready(())
861     ///     });
862     ///     fut.await;
863     /// }
864     ///
865     /// assert_eq!(x, 3);
866     /// # });
867     /// ```
for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,868     fn for_each<Fut, F>(self, f: F) -> ForEach<Self, Fut, F>
869     where
870         F: FnMut(Self::Item) -> Fut,
871         Fut: Future<Output = ()>,
872         Self: Sized,
873     {
874         assert_future::<(), _>(ForEach::new(self, f))
875     }
876 
877     /// Runs this stream to completion, executing the provided asynchronous
878     /// closure for each element on the stream concurrently as elements become
879     /// available.
880     ///
881     /// This is similar to [`StreamExt::for_each`], but the futures
882     /// produced by the closure are run concurrently (but not in parallel--
883     /// this combinator does not introduce any threads).
884     ///
885     /// The closure provided will be called for each item this stream produces,
886     /// yielding a future. That future will then be executed to completion
887     /// concurrently with the other futures produced by the closure.
888     ///
889     /// The first argument is an optional limit on the number of concurrent
890     /// futures. If this limit is not `None`, no more than `limit` futures
891     /// will be run concurrently. The `limit` argument is of type
892     /// `Into<Option<usize>>`, and so can be provided as either `None`,
893     /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
894     /// no limit at all, and will have the same result as passing in `None`.
895     ///
896     /// This method is only available when the `std` or `alloc` feature of this
897     /// library is activated, and it is activated by default.
898     ///
899     /// # Examples
900     ///
901     /// ```
902     /// # futures::executor::block_on(async {
903     /// use futures::channel::oneshot;
904     /// use futures::stream::{self, StreamExt};
905     ///
906     /// let (tx1, rx1) = oneshot::channel();
907     /// let (tx2, rx2) = oneshot::channel();
908     /// let (tx3, rx3) = oneshot::channel();
909     ///
910     /// let fut = stream::iter(vec![rx1, rx2, rx3]).for_each_concurrent(
911     ///     /* limit */ 2,
912     ///     |rx| async move {
913     ///         rx.await.unwrap();
914     ///     }
915     /// );
916     /// tx1.send(()).unwrap();
917     /// tx2.send(()).unwrap();
918     /// tx3.send(()).unwrap();
919     /// fut.await;
920     /// # })
921     /// ```
922     #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
923     #[cfg(feature = "alloc")]
for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> ForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Item) -> Fut, Fut: Future<Output = ()>, Self: Sized,924     fn for_each_concurrent<Fut, F>(
925         self,
926         limit: impl Into<Option<usize>>,
927         f: F,
928     ) -> ForEachConcurrent<Self, Fut, F>
929     where
930         F: FnMut(Self::Item) -> Fut,
931         Fut: Future<Output = ()>,
932         Self: Sized,
933     {
934         assert_future::<(), _>(ForEachConcurrent::new(self, limit.into(), f))
935     }
936 
937     /// Creates a new stream of at most `n` items of the underlying stream.
938     ///
939     /// Once `n` items have been yielded from this stream then it will always
940     /// return that the stream is done.
941     ///
942     /// # Examples
943     ///
944     /// ```
945     /// # futures::executor::block_on(async {
946     /// use futures::stream::{self, StreamExt};
947     ///
948     /// let stream = stream::iter(1..=10).take(3);
949     ///
950     /// assert_eq!(vec![1, 2, 3], stream.collect::<Vec<_>>().await);
951     /// # });
952     /// ```
take(self, n: usize) -> Take<Self> where Self: Sized,953     fn take(self, n: usize) -> Take<Self>
954     where
955         Self: Sized,
956     {
957         assert_stream::<Self::Item, _>(Take::new(self, n))
958     }
959 
960     /// Creates a new stream which skips `n` items of the underlying stream.
961     ///
962     /// Once `n` items have been skipped from this stream then it will always
963     /// return the remaining items on this stream.
964     ///
965     /// # Examples
966     ///
967     /// ```
968     /// # futures::executor::block_on(async {
969     /// use futures::stream::{self, StreamExt};
970     ///
971     /// let stream = stream::iter(1..=10).skip(5);
972     ///
973     /// assert_eq!(vec![6, 7, 8, 9, 10], stream.collect::<Vec<_>>().await);
974     /// # });
975     /// ```
skip(self, n: usize) -> Skip<Self> where Self: Sized,976     fn skip(self, n: usize) -> Skip<Self>
977     where
978         Self: Sized,
979     {
980         assert_stream::<Self::Item, _>(Skip::new(self, n))
981     }
982 
983     /// Fuse a stream such that [`poll_next`](Stream::poll_next) will never
984     /// again be called once it has finished. This method can be used to turn
985     /// any `Stream` into a `FusedStream`.
986     ///
987     /// Normally, once a stream has returned [`None`] from
988     /// [`poll_next`](Stream::poll_next) any further calls could exhibit bad
989     /// behavior such as block forever, panic, never return, etc. If it is known
990     /// that [`poll_next`](Stream::poll_next) may be called after stream
991     /// has already finished, then this method can be used to ensure that it has
992     /// defined semantics.
993     ///
994     /// The [`poll_next`](Stream::poll_next) method of a `fuse`d stream
995     /// is guaranteed to return [`None`] after the underlying stream has
996     /// finished.
997     ///
998     /// # Examples
999     ///
1000     /// ```
1001     /// use futures::executor::block_on_stream;
1002     /// use futures::stream::{self, StreamExt};
1003     /// use futures::task::Poll;
1004     ///
1005     /// let mut x = 0;
1006     /// let stream = stream::poll_fn(|_| {
1007     ///     x += 1;
1008     ///     match x {
1009     ///         0..=2 => Poll::Ready(Some(x)),
1010     ///         3 => Poll::Ready(None),
1011     ///         _ => panic!("should not happen")
1012     ///     }
1013     /// }).fuse();
1014     ///
1015     /// let mut iter = block_on_stream(stream);
1016     /// assert_eq!(Some(1), iter.next());
1017     /// assert_eq!(Some(2), iter.next());
1018     /// assert_eq!(None, iter.next());
1019     /// assert_eq!(None, iter.next());
1020     /// // ...
1021     /// ```
fuse(self) -> Fuse<Self> where Self: Sized,1022     fn fuse(self) -> Fuse<Self>
1023     where
1024         Self: Sized,
1025     {
1026         assert_stream::<Self::Item, _>(Fuse::new(self))
1027     }
1028 
1029     /// Borrows a stream, rather than consuming it.
1030     ///
1031     /// This is useful to allow applying stream adaptors while still retaining
1032     /// ownership of the original stream.
1033     ///
1034     /// # Examples
1035     ///
1036     /// ```
1037     /// # futures::executor::block_on(async {
1038     /// use futures::stream::{self, StreamExt};
1039     ///
1040     /// let mut stream = stream::iter(1..5);
1041     ///
1042     /// let sum = stream.by_ref()
1043     ///                 .take(2)
1044     ///                 .fold(0, |a, b| async move { a + b })
1045     ///                 .await;
1046     /// assert_eq!(sum, 3);
1047     ///
1048     /// // You can use the stream again
1049     /// let sum = stream.take(2)
1050     ///                 .fold(0, |a, b| async move { a + b })
1051     ///                 .await;
1052     /// assert_eq!(sum, 7);
1053     /// # });
1054     /// ```
by_ref(&mut self) -> &mut Self1055     fn by_ref(&mut self) -> &mut Self {
1056         self
1057     }
1058 
1059     /// Catches unwinding panics while polling the stream.
1060     ///
1061     /// Caught panic (if any) will be the last element of the resulting stream.
1062     ///
1063     /// In general, panics within a stream can propagate all the way out to the
1064     /// task level. This combinator makes it possible to halt unwinding within
1065     /// the stream itself. It's most commonly used within task executors. This
1066     /// method should not be used for error handling.
1067     ///
1068     /// Note that this method requires the `UnwindSafe` bound from the standard
1069     /// library. This isn't always applied automatically, and the standard
1070     /// library provides an `AssertUnwindSafe` wrapper type to apply it
1071     /// after-the fact. To assist using this method, the [`Stream`] trait is
1072     /// also implemented for `AssertUnwindSafe<St>` where `St` implements
1073     /// [`Stream`].
1074     ///
1075     /// This method is only available when the `std` feature of this
1076     /// library is activated, and it is activated by default.
1077     ///
1078     /// # Examples
1079     ///
1080     /// ```
1081     /// # futures::executor::block_on(async {
1082     /// use futures::stream::{self, StreamExt};
1083     ///
1084     /// let stream = stream::iter(vec![Some(10), None, Some(11)]);
1085     /// // Panic on second element
1086     /// let stream_panicking = stream.map(|o| o.unwrap());
1087     /// // Collect all the results
1088     /// let stream = stream_panicking.catch_unwind();
1089     ///
1090     /// let results: Vec<Result<i32, _>> = stream.collect().await;
1091     /// match results[0] {
1092     ///     Ok(10) => {}
1093     ///     _ => panic!("unexpected result!"),
1094     /// }
1095     /// assert!(results[1].is_err());
1096     /// assert_eq!(results.len(), 2);
1097     /// # });
1098     /// ```
1099     #[cfg(feature = "std")]
catch_unwind(self) -> CatchUnwind<Self> where Self: Sized + std::panic::UnwindSafe,1100     fn catch_unwind(self) -> CatchUnwind<Self>
1101     where
1102         Self: Sized + std::panic::UnwindSafe,
1103     {
1104         assert_stream(CatchUnwind::new(self))
1105     }
1106 
1107     /// Wrap the stream in a Box, pinning it.
1108     ///
1109     /// This method is only available when the `std` or `alloc` feature of this
1110     /// library is activated, and it is activated by default.
1111     #[cfg(feature = "alloc")]
boxed<'a>(self) -> BoxStream<'a, Self::Item> where Self: Sized + Send + 'a,1112     fn boxed<'a>(self) -> BoxStream<'a, Self::Item>
1113     where
1114         Self: Sized + Send + 'a,
1115     {
1116         assert_stream::<Self::Item, _>(Box::pin(self))
1117     }
1118 
1119     /// Wrap the stream in a Box, pinning it.
1120     ///
1121     /// Similar to `boxed`, but without the `Send` requirement.
1122     ///
1123     /// This method is only available when the `std` or `alloc` feature of this
1124     /// library is activated, and it is activated by default.
1125     #[cfg(feature = "alloc")]
boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item> where Self: Sized + 'a,1126     fn boxed_local<'a>(self) -> LocalBoxStream<'a, Self::Item>
1127     where
1128         Self: Sized + 'a,
1129     {
1130         assert_stream::<Self::Item, _>(Box::pin(self))
1131     }
1132 
1133     /// An adaptor for creating a buffered list of pending futures.
1134     ///
1135     /// If this stream's item can be converted into a future, then this adaptor
1136     /// will buffer up to at most `n` futures and then return the outputs in the
1137     /// same order as the underlying stream. No more than `n` futures will be
1138     /// buffered at any point in time, and less than `n` may also be buffered
1139     /// depending on the state of each future.
1140     ///
1141     /// The returned stream will be a stream of each future's output.
1142     ///
1143     /// This method is only available when the `std` or `alloc` feature of this
1144     /// library is activated, and it is activated by default.
1145     #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
1146     #[cfg(feature = "alloc")]
buffered(self, n: usize) -> Buffered<Self> where Self::Item: Future, Self: Sized,1147     fn buffered(self, n: usize) -> Buffered<Self>
1148     where
1149         Self::Item: Future,
1150         Self: Sized,
1151     {
1152         assert_stream::<<Self::Item as Future>::Output, _>(Buffered::new(self, n))
1153     }
1154 
1155     /// An adaptor for creating a buffered list of pending futures (unordered).
1156     ///
1157     /// If this stream's item can be converted into a future, then this adaptor
1158     /// will buffer up to `n` futures and then return the outputs in the order
1159     /// in which they complete. No more than `n` futures will be buffered at
1160     /// any point in time, and less than `n` may also be buffered depending on
1161     /// the state of each future.
1162     ///
1163     /// The returned stream will be a stream of each future's output.
1164     ///
1165     /// This method is only available when the `std` or `alloc` feature of this
1166     /// library is activated, and it is activated by default.
1167     ///
1168     /// # Examples
1169     ///
1170     /// ```
1171     /// # futures::executor::block_on(async {
1172     /// use futures::channel::oneshot;
1173     /// use futures::stream::{self, StreamExt};
1174     ///
1175     /// let (send_one, recv_one) = oneshot::channel();
1176     /// let (send_two, recv_two) = oneshot::channel();
1177     ///
1178     /// let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
1179     /// let mut buffered = stream_of_futures.buffer_unordered(10);
1180     ///
1181     /// send_two.send(2i32)?;
1182     /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1183     ///
1184     /// send_one.send(1i32)?;
1185     /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1186     ///
1187     /// assert_eq!(buffered.next().await, None);
1188     /// # Ok::<(), i32>(()) }).unwrap();
1189     /// ```
1190     #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
1191     #[cfg(feature = "alloc")]
buffer_unordered(self, n: usize) -> BufferUnordered<Self> where Self::Item: Future, Self: Sized,1192     fn buffer_unordered(self, n: usize) -> BufferUnordered<Self>
1193     where
1194         Self::Item: Future,
1195         Self: Sized,
1196     {
1197         assert_stream::<<Self::Item as Future>::Output, _>(BufferUnordered::new(self, n))
1198     }
1199 
1200     /// An adapter for zipping two streams together.
1201     ///
1202     /// The zipped stream waits for both streams to produce an item, and then
1203     /// returns that pair. If either stream ends then the zipped stream will
1204     /// also end.
1205     ///
1206     /// # Examples
1207     ///
1208     /// ```
1209     /// # futures::executor::block_on(async {
1210     /// use futures::stream::{self, StreamExt};
1211     ///
1212     /// let stream1 = stream::iter(1..=3);
1213     /// let stream2 = stream::iter(5..=10);
1214     ///
1215     /// let vec = stream1.zip(stream2)
1216     ///                  .collect::<Vec<_>>()
1217     ///                  .await;
1218     /// assert_eq!(vec![(1, 5), (2, 6), (3, 7)], vec);
1219     /// # });
1220     /// ```
1221     ///
zip<St>(self, other: St) -> Zip<Self, St> where St: Stream, Self: Sized,1222     fn zip<St>(self, other: St) -> Zip<Self, St>
1223     where
1224         St: Stream,
1225         Self: Sized,
1226     {
1227         assert_stream::<(Self::Item, St::Item), _>(Zip::new(self, other))
1228     }
1229 
1230     /// Adapter for chaining two streams.
1231     ///
1232     /// The resulting stream emits elements from the first stream, and when
1233     /// first stream reaches the end, emits the elements from the second stream.
1234     ///
1235     /// ```
1236     /// # futures::executor::block_on(async {
1237     /// use futures::stream::{self, StreamExt};
1238     ///
1239     /// let stream1 = stream::iter(vec![Ok(10), Err(false)]);
1240     /// let stream2 = stream::iter(vec![Err(true), Ok(20)]);
1241     ///
1242     /// let stream = stream1.chain(stream2);
1243     ///
1244     /// let result: Vec<_> = stream.collect().await;
1245     /// assert_eq!(result, vec![
1246     ///     Ok(10),
1247     ///     Err(false),
1248     ///     Err(true),
1249     ///     Ok(20),
1250     /// ]);
1251     /// # });
1252     /// ```
chain<St>(self, other: St) -> Chain<Self, St> where St: Stream<Item = Self::Item>, Self: Sized,1253     fn chain<St>(self, other: St) -> Chain<Self, St>
1254     where
1255         St: Stream<Item = Self::Item>,
1256         Self: Sized,
1257     {
1258         assert_stream::<Self::Item, _>(Chain::new(self, other))
1259     }
1260 
1261     /// Creates a new stream which exposes a `peek` method.
1262     ///
1263     /// Calling `peek` returns a reference to the next item in the stream.
peekable(self) -> Peekable<Self> where Self: Sized,1264     fn peekable(self) -> Peekable<Self>
1265     where
1266         Self: Sized,
1267     {
1268         assert_stream::<Self::Item, _>(Peekable::new(self))
1269     }
1270 
1271     /// An adaptor for chunking up items of the stream inside a vector.
1272     ///
1273     /// This combinator will attempt to pull items from this stream and buffer
1274     /// them into a local vector. At most `capacity` items will get buffered
1275     /// before they're yielded from the returned stream.
1276     ///
1277     /// Note that the vectors returned from this iterator may not always have
1278     /// `capacity` elements. If the underlying stream ended and only a partial
1279     /// vector was created, it'll be returned. Additionally if an error happens
1280     /// from the underlying stream then the currently buffered items will be
1281     /// yielded.
1282     ///
1283     /// This method is only available when the `std` or `alloc` feature of this
1284     /// library is activated, and it is activated by default.
1285     ///
1286     /// # Panics
1287     ///
1288     /// This method will panic if `capacity` is zero.
1289     #[cfg(feature = "alloc")]
chunks(self, capacity: usize) -> Chunks<Self> where Self: Sized,1290     fn chunks(self, capacity: usize) -> Chunks<Self>
1291     where
1292         Self: Sized,
1293     {
1294         assert_stream::<Vec<Self::Item>, _>(Chunks::new(self, capacity))
1295     }
1296 
1297     /// An adaptor for chunking up ready items of the stream inside a vector.
1298     ///
1299     /// This combinator will attempt to pull ready items from this stream and
1300     /// buffer them into a local vector. At most `capacity` items will get
1301     /// buffered before they're yielded from the returned stream. If underlying
1302     /// stream returns `Poll::Pending`, and collected chunk is not empty, it will
1303     /// be immediately returned.
1304     ///
1305     /// If the underlying stream ended and only a partial vector was created,
1306     /// it'll be returned. Additionally if an error happens from the underlying
1307     /// stream then the currently buffered items will be yielded.
1308     ///
1309     /// This method is only available when the `std` or `alloc` feature of this
1310     /// library is activated, and it is activated by default.
1311     ///
1312     /// # Panics
1313     ///
1314     /// This method will panic if `capacity` is zero.
1315     #[cfg(feature = "alloc")]
ready_chunks(self, capacity: usize) -> ReadyChunks<Self> where Self: Sized,1316     fn ready_chunks(self, capacity: usize) -> ReadyChunks<Self>
1317     where
1318         Self: Sized,
1319     {
1320         assert_stream::<Vec<Self::Item>, _>(ReadyChunks::new(self, capacity))
1321     }
1322 
1323     /// A future that completes after the given stream has been fully processed
1324     /// into the sink and the sink has been flushed and closed.
1325     ///
1326     /// This future will drive the stream to keep producing items until it is
1327     /// exhausted, sending each item to the sink. It will complete once the
1328     /// stream is exhausted, the sink has received and flushed all items, and
1329     /// the sink is closed. Note that neither the original stream nor provided
1330     /// sink will be output by this future. Pass the sink by `Pin<&mut S>`
1331     /// (for example, via `forward(&mut sink)` inside an `async` fn/block) in
1332     /// order to preserve access to the `Sink`.
1333     #[cfg(feature = "sink")]
1334     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
forward<S>(self, sink: S) -> Forward<Self, S> where S: Sink<Self::Ok, Error = Self::Error>, Self: TryStream + Sized,1335     fn forward<S>(self, sink: S) -> Forward<Self, S>
1336     where
1337         S: Sink<Self::Ok, Error = Self::Error>,
1338         Self: TryStream + Sized,
1339         // Self: TryStream + Sized + Stream<Item = Result<<Self as TryStream>::Ok, <Self as TryStream>::Error>>,
1340     {
1341         // TODO: type mismatch resolving `<Self as futures_core::Stream>::Item == std::result::Result<<Self as futures_core::TryStream>::Ok, <Self as futures_core::TryStream>::Error>`
1342         // assert_future::<Result<(), Self::Error>, _>(Forward::new(self, sink))
1343         Forward::new(self, sink)
1344     }
1345 
1346     /// Splits this `Stream + Sink` object into separate `Sink` and `Stream`
1347     /// objects.
1348     ///
1349     /// This can be useful when you want to split ownership between tasks, or
1350     /// allow direct interaction between the two objects (e.g. via
1351     /// `Sink::send_all`).
1352     ///
1353     /// This method is only available when the `std` or `alloc` feature of this
1354     /// library is activated, and it is activated by default.
1355     #[cfg(feature = "sink")]
1356     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
1357     #[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
1358     #[cfg(feature = "alloc")]
split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>) where Self: Sink<Item> + Sized,1359     fn split<Item>(self) -> (SplitSink<Self, Item>, SplitStream<Self>)
1360     where
1361         Self: Sink<Item> + Sized,
1362     {
1363         let (sink, stream) = split::split(self);
1364         (
1365             crate::sink::assert_sink::<Item, Self::Error, _>(sink),
1366             assert_stream::<Self::Item, _>(stream),
1367         )
1368     }
1369 
1370     /// Do something with each item of this stream, afterwards passing it on.
1371     ///
1372     /// This is similar to the `Iterator::inspect` method in the standard
1373     /// library where it allows easily inspecting each value as it passes
1374     /// through the stream, for example to debug what's going on.
inspect<F>(self, f: F) -> Inspect<Self, F> where F: FnMut(&Self::Item), Self: Sized,1375     fn inspect<F>(self, f: F) -> Inspect<Self, F>
1376     where
1377         F: FnMut(&Self::Item),
1378         Self: Sized,
1379     {
1380         assert_stream::<Self::Item, _>(Inspect::new(self, f))
1381     }
1382 
1383     /// Wrap this stream in an `Either` stream, making it the left-hand variant
1384     /// of that `Either`.
1385     ///
1386     /// This can be used in combination with the `right_stream` method to write `if`
1387     /// statements that evaluate to different streams in different branches.
left_stream<B>(self) -> Either<Self, B> where B: Stream<Item = Self::Item>, Self: Sized,1388     fn left_stream<B>(self) -> Either<Self, B>
1389     where
1390         B: Stream<Item = Self::Item>,
1391         Self: Sized,
1392     {
1393         assert_stream::<Self::Item, _>(Either::Left(self))
1394     }
1395 
1396     /// Wrap this stream in an `Either` stream, making it the right-hand variant
1397     /// of that `Either`.
1398     ///
1399     /// This can be used in combination with the `left_stream` method to write `if`
1400     /// statements that evaluate to different streams in different branches.
right_stream<B>(self) -> Either<B, Self> where B: Stream<Item = Self::Item>, Self: Sized,1401     fn right_stream<B>(self) -> Either<B, Self>
1402     where
1403         B: Stream<Item = Self::Item>,
1404         Self: Sized,
1405     {
1406         assert_stream::<Self::Item, _>(Either::Right(self))
1407     }
1408 
1409     /// A convenience method for calling [`Stream::poll_next`] on [`Unpin`]
1410     /// stream types.
poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where Self: Unpin,1411     fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
1412     where
1413         Self: Unpin,
1414     {
1415         Pin::new(self).poll_next(cx)
1416     }
1417 
1418     /// Returns a [`Future`] that resolves when the next item in this stream is
1419     /// ready.
1420     ///
1421     /// This is similar to the [`next`][StreamExt::next] method, but it won't
1422     /// resolve to [`None`] if used on an empty [`Stream`]. Instead, the
1423     /// returned future type will return `true` from
1424     /// [`FusedFuture::is_terminated`][] when the [`Stream`] is empty, allowing
1425     /// [`select_next_some`][StreamExt::select_next_some] to be easily used with
1426     /// the [`select!`] macro.
1427     ///
1428     /// If the future is polled after this [`Stream`] is empty it will panic.
1429     /// Using the future with a [`FusedFuture`][]-aware primitive like the
1430     /// [`select!`] macro will prevent this.
1431     ///
1432     /// [`FusedFuture`]: futures_core::future::FusedFuture
1433     /// [`FusedFuture::is_terminated`]: futures_core::future::FusedFuture::is_terminated
1434     ///
1435     /// # Examples
1436     ///
1437     /// ```
1438     /// # futures::executor::block_on(async {
1439     /// use futures::{future, select};
1440     /// use futures::stream::{StreamExt, FuturesUnordered};
1441     ///
1442     /// let mut fut = future::ready(1);
1443     /// let mut async_tasks = FuturesUnordered::new();
1444     /// let mut total = 0;
1445     /// loop {
1446     ///     select! {
1447     ///         num = fut => {
1448     ///             // First, the `ready` future completes.
1449     ///             total += num;
1450     ///             // Then we spawn a new task onto `async_tasks`,
1451     ///             async_tasks.push(async { 5 });
1452     ///         },
1453     ///         // On the next iteration of the loop, the task we spawned
1454     ///         // completes.
1455     ///         num = async_tasks.select_next_some() => {
1456     ///             total += num;
1457     ///         }
1458     ///         // Finally, both the `ready` future and `async_tasks` have
1459     ///         // finished, so we enter the `complete` branch.
1460     ///         complete => break,
1461     ///     }
1462     /// }
1463     /// assert_eq!(total, 6);
1464     /// # });
1465     /// ```
select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream,1466     fn select_next_some(&mut self) -> SelectNextSome<'_, Self>
1467     where
1468         Self: Unpin + FusedStream,
1469     {
1470         assert_future::<Self::Item, _>(SelectNextSome::new(self))
1471     }
1472 }
1473