• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Streams
2 //!
3 //! This module contains a number of functions for working with `Streams`s
4 //! that return `Result`s, allowing for short-circuiting computations.
5 
6 #[cfg(feature = "compat")]
7 use crate::compat::Compat;
8 use crate::fns::{
9     inspect_err_fn, inspect_ok_fn, into_fn, map_err_fn, map_ok_fn, InspectErrFn, InspectOkFn,
10     IntoFn, MapErrFn, MapOkFn,
11 };
12 use crate::future::assert_future;
13 use crate::stream::assert_stream;
14 use crate::stream::{Inspect, Map};
15 #[cfg(feature = "alloc")]
16 use alloc::vec::Vec;
17 use core::pin::Pin;
18 use futures_core::{
19     future::{Future, TryFuture},
20     stream::TryStream,
21     task::{Context, Poll},
22 };
23 
24 mod and_then;
25 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
26 pub use self::and_then::AndThen;
27 
28 delegate_all!(
29     /// Stream for the [`err_into`](super::TryStreamExt::err_into) method.
30     ErrInto<St, E>(
31         MapErr<St, IntoFn<E>>
32     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (.)] + New[|x: St| MapErr::new(x, into_fn())]
33 );
34 
35 delegate_all!(
36     /// Stream for the [`inspect_ok`](super::TryStreamExt::inspect_ok) method.
37     InspectOk<St, F>(
38         Inspect<IntoStream<St>, InspectOkFn<F>>
39     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_ok_fn(f))]
40 );
41 
42 delegate_all!(
43     /// Stream for the [`inspect_err`](super::TryStreamExt::inspect_err) method.
44     InspectErr<St, F>(
45         Inspect<IntoStream<St>, InspectErrFn<F>>
46     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Inspect::new(IntoStream::new(x), inspect_err_fn(f))]
47 );
48 
49 mod into_stream;
50 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
51 pub use self::into_stream::IntoStream;
52 
53 delegate_all!(
54     /// Stream for the [`map_ok`](super::TryStreamExt::map_ok) method.
55     MapOk<St, F>(
56         Map<IntoStream<St>, MapOkFn<F>>
57     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_ok_fn(f))]
58 );
59 
60 delegate_all!(
61     /// Stream for the [`map_err`](super::TryStreamExt::map_err) method.
62     MapErr<St, F>(
63         Map<IntoStream<St>, MapErrFn<F>>
64     ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, f: F| Map::new(IntoStream::new(x), map_err_fn(f))]
65 );
66 
67 mod or_else;
68 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
69 pub use self::or_else::OrElse;
70 
71 mod try_next;
72 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
73 pub use self::try_next::TryNext;
74 
75 mod try_for_each;
76 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
77 pub use self::try_for_each::TryForEach;
78 
79 mod try_filter;
80 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
81 pub use self::try_filter::TryFilter;
82 
83 mod try_filter_map;
84 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
85 pub use self::try_filter_map::TryFilterMap;
86 
87 mod try_flatten;
88 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
89 pub use self::try_flatten::TryFlatten;
90 
91 mod try_collect;
92 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
93 pub use self::try_collect::TryCollect;
94 
95 mod try_concat;
96 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
97 pub use self::try_concat::TryConcat;
98 
99 #[cfg(feature = "alloc")]
100 mod try_chunks;
101 #[cfg(feature = "alloc")]
102 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
103 pub use self::try_chunks::{TryChunks, TryChunksError};
104 
105 mod try_fold;
106 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
107 pub use self::try_fold::TryFold;
108 
109 mod try_unfold;
110 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
111 pub use self::try_unfold::{try_unfold, TryUnfold};
112 
113 mod try_skip_while;
114 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
115 pub use self::try_skip_while::TrySkipWhile;
116 
117 mod try_take_while;
118 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
119 pub use self::try_take_while::TryTakeWhile;
120 
121 #[cfg(not(futures_no_atomic_cas))]
122 #[cfg(feature = "alloc")]
123 mod try_buffer_unordered;
124 #[cfg(not(futures_no_atomic_cas))]
125 #[cfg(feature = "alloc")]
126 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
127 pub use self::try_buffer_unordered::TryBufferUnordered;
128 
129 #[cfg(not(futures_no_atomic_cas))]
130 #[cfg(feature = "alloc")]
131 mod try_buffered;
132 #[cfg(not(futures_no_atomic_cas))]
133 #[cfg(feature = "alloc")]
134 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
135 pub use self::try_buffered::TryBuffered;
136 
137 #[cfg(not(futures_no_atomic_cas))]
138 #[cfg(feature = "alloc")]
139 mod try_for_each_concurrent;
140 #[cfg(not(futures_no_atomic_cas))]
141 #[cfg(feature = "alloc")]
142 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
143 pub use self::try_for_each_concurrent::TryForEachConcurrent;
144 
145 #[cfg(feature = "io")]
146 #[cfg(feature = "std")]
147 mod into_async_read;
148 #[cfg(feature = "io")]
149 #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
150 #[cfg(feature = "std")]
151 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
152 pub use self::into_async_read::IntoAsyncRead;
153 
154 impl<S: ?Sized + TryStream> TryStreamExt for S {}
155 
156 /// Adapters specific to `Result`-returning streams
157 pub trait TryStreamExt: TryStream {
158     /// Wraps the current stream in a new stream which converts the error type
159     /// into the one provided.
160     ///
161     /// # Examples
162     ///
163     /// ```
164     /// # futures::executor::block_on(async {
165     /// use futures::stream::{self, TryStreamExt};
166     ///
167     /// let mut stream =
168     ///     stream::iter(vec![Ok(()), Err(5i32)])
169     ///         .err_into::<i64>();
170     ///
171     /// assert_eq!(stream.try_next().await, Ok(Some(())));
172     /// assert_eq!(stream.try_next().await, Err(5i64));
173     /// # })
174     /// ```
err_into<E>(self) -> ErrInto<Self, E> where Self: Sized, Self::Error: Into<E>,175     fn err_into<E>(self) -> ErrInto<Self, E>
176     where
177         Self: Sized,
178         Self::Error: Into<E>,
179     {
180         assert_stream::<Result<Self::Ok, E>, _>(ErrInto::new(self))
181     }
182 
183     /// Wraps the current stream in a new stream which maps the success value
184     /// using the provided closure.
185     ///
186     /// # Examples
187     ///
188     /// ```
189     /// # futures::executor::block_on(async {
190     /// use futures::stream::{self, TryStreamExt};
191     ///
192     /// let mut stream =
193     ///     stream::iter(vec![Ok(5), Err(0)])
194     ///         .map_ok(|x| x + 2);
195     ///
196     /// assert_eq!(stream.try_next().await, Ok(Some(7)));
197     /// assert_eq!(stream.try_next().await, Err(0));
198     /// # })
199     /// ```
map_ok<T, F>(self, f: F) -> MapOk<Self, F> where Self: Sized, F: FnMut(Self::Ok) -> T,200     fn map_ok<T, F>(self, f: F) -> MapOk<Self, F>
201     where
202         Self: Sized,
203         F: FnMut(Self::Ok) -> T,
204     {
205         assert_stream::<Result<T, Self::Error>, _>(MapOk::new(self, f))
206     }
207 
208     /// Wraps the current stream in a new stream which maps the error value
209     /// using the provided closure.
210     ///
211     /// # Examples
212     ///
213     /// ```
214     /// # futures::executor::block_on(async {
215     /// use futures::stream::{self, TryStreamExt};
216     ///
217     /// let mut stream =
218     ///     stream::iter(vec![Ok(5), Err(0)])
219     ///         .map_err(|x| x + 2);
220     ///
221     /// assert_eq!(stream.try_next().await, Ok(Some(5)));
222     /// assert_eq!(stream.try_next().await, Err(2));
223     /// # })
224     /// ```
map_err<E, F>(self, f: F) -> MapErr<Self, F> where Self: Sized, F: FnMut(Self::Error) -> E,225     fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
226     where
227         Self: Sized,
228         F: FnMut(Self::Error) -> E,
229     {
230         assert_stream::<Result<Self::Ok, E>, _>(MapErr::new(self, f))
231     }
232 
233     /// Chain on a computation for when a value is ready, passing the successful
234     /// results to the provided closure `f`.
235     ///
236     /// This function can be used to run a unit of work when the next successful
237     /// value on a stream is ready. The closure provided will be yielded a value
238     /// when ready, and the returned future will then be run to completion to
239     /// produce the next value on this stream.
240     ///
241     /// Any errors produced by this stream will not be passed to the closure,
242     /// and will be passed through.
243     ///
244     /// The returned value of the closure must implement the `TryFuture` trait
245     /// and can represent some more work to be done before the composed stream
246     /// is finished.
247     ///
248     /// Note that this function consumes the receiving stream and returns a
249     /// wrapped version of it.
250     ///
251     /// To process the entire stream and return a single future representing
252     /// success or error, use `try_for_each` instead.
253     ///
254     /// # Examples
255     ///
256     /// ```
257     /// use futures::channel::mpsc;
258     /// use futures::future;
259     /// use futures::stream::TryStreamExt;
260     ///
261     /// let (_tx, rx) = mpsc::channel::<Result<i32, ()>>(1);
262     ///
263     /// let rx = rx.and_then(|result| {
264     ///     future::ok(if result % 2 == 0 {
265     ///         Some(result)
266     ///     } else {
267     ///         None
268     ///     })
269     /// });
270     /// ```
and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Error = Self::Error>, Self: Sized,271     fn and_then<Fut, F>(self, f: F) -> AndThen<Self, Fut, F>
272     where
273         F: FnMut(Self::Ok) -> Fut,
274         Fut: TryFuture<Error = Self::Error>,
275         Self: Sized,
276     {
277         assert_stream::<Result<Fut::Ok, Fut::Error>, _>(AndThen::new(self, f))
278     }
279 
280     /// Chain on a computation for when an error happens, passing the
281     /// erroneous result to the provided closure `f`.
282     ///
283     /// This function can be used to run a unit of work and attempt to recover from
284     /// an error if one happens. The closure provided will be yielded an error
285     /// when one appears, and the returned future will then be run to completion
286     /// to produce the next value on this stream.
287     ///
288     /// Any successful values produced by this stream will not be passed to the
289     /// closure, and will be passed through.
290     ///
291     /// The returned value of the closure must implement the [`TryFuture`](futures_core::future::TryFuture) trait
292     /// and can represent some more work to be done before the composed stream
293     /// is finished.
294     ///
295     /// Note that this function consumes the receiving stream and returns a
296     /// wrapped version of it.
or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F> where F: FnMut(Self::Error) -> Fut, Fut: TryFuture<Ok = Self::Ok>, Self: Sized,297     fn or_else<Fut, F>(self, f: F) -> OrElse<Self, Fut, F>
298     where
299         F: FnMut(Self::Error) -> Fut,
300         Fut: TryFuture<Ok = Self::Ok>,
301         Self: Sized,
302     {
303         assert_stream::<Result<Self::Ok, Fut::Error>, _>(OrElse::new(self, f))
304     }
305 
306     /// Do something with the success value of this stream, afterwards passing
307     /// it on.
308     ///
309     /// This is similar to the `StreamExt::inspect` method where it allows
310     /// easily inspecting the success value as it passes through the stream, for
311     /// example to debug what's going on.
inspect_ok<F>(self, f: F) -> InspectOk<Self, F> where F: FnMut(&Self::Ok), Self: Sized,312     fn inspect_ok<F>(self, f: F) -> InspectOk<Self, F>
313     where
314         F: FnMut(&Self::Ok),
315         Self: Sized,
316     {
317         assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectOk::new(self, f))
318     }
319 
320     /// Do something with the error value of this stream, afterwards passing it on.
321     ///
322     /// This is similar to the `StreamExt::inspect` method where it allows
323     /// easily inspecting the error value as it passes through the stream, for
324     /// example to debug what's going on.
inspect_err<F>(self, f: F) -> InspectErr<Self, F> where F: FnMut(&Self::Error), Self: Sized,325     fn inspect_err<F>(self, f: F) -> InspectErr<Self, F>
326     where
327         F: FnMut(&Self::Error),
328         Self: Sized,
329     {
330         assert_stream::<Result<Self::Ok, Self::Error>, _>(InspectErr::new(self, f))
331     }
332 
333     /// Wraps a [`TryStream`] into a type that implements
334     /// [`Stream`](futures_core::stream::Stream)
335     ///
336     /// [`TryStream`]s currently do not implement the
337     /// [`Stream`](futures_core::stream::Stream) trait because of limitations
338     /// of the compiler.
339     ///
340     /// # Examples
341     ///
342     /// ```
343     /// use futures::stream::{Stream, TryStream, TryStreamExt};
344     ///
345     /// # type T = i32;
346     /// # type E = ();
347     /// fn make_try_stream() -> impl TryStream<Ok = T, Error = E> { // ... }
348     /// # futures::stream::empty()
349     /// # }
350     /// fn take_stream(stream: impl Stream<Item = Result<T, E>>) { /* ... */ }
351     ///
352     /// take_stream(make_try_stream().into_stream());
353     /// ```
into_stream(self) -> IntoStream<Self> where Self: Sized,354     fn into_stream(self) -> IntoStream<Self>
355     where
356         Self: Sized,
357     {
358         assert_stream::<Result<Self::Ok, Self::Error>, _>(IntoStream::new(self))
359     }
360 
361     /// Creates a future that attempts to resolve the next item in the stream.
362     /// If an error is encountered before the next item, the error is returned
363     /// instead.
364     ///
365     /// This is similar to the `Stream::next` combinator, but returns a
366     /// `Result<Option<T>, E>` rather than an `Option<Result<T, E>>`, making
367     /// for easy use with the `?` operator.
368     ///
369     /// # Examples
370     ///
371     /// ```
372     /// # futures::executor::block_on(async {
373     /// use futures::stream::{self, TryStreamExt};
374     ///
375     /// let mut stream = stream::iter(vec![Ok(()), Err(())]);
376     ///
377     /// assert_eq!(stream.try_next().await, Ok(Some(())));
378     /// assert_eq!(stream.try_next().await, Err(()));
379     /// # })
380     /// ```
try_next(&mut self) -> TryNext<'_, Self> where Self: Unpin,381     fn try_next(&mut self) -> TryNext<'_, Self>
382     where
383         Self: Unpin,
384     {
385         assert_future::<Result<Option<Self::Ok>, Self::Error>, _>(TryNext::new(self))
386     }
387 
388     /// Attempts to run this stream to completion, executing the provided
389     /// asynchronous closure for each element on the stream.
390     ///
391     /// The provided closure will be called for each item this stream produces,
392     /// yielding a future. That future will then be executed to completion
393     /// before moving on to the next item.
394     ///
395     /// The returned value is a [`Future`](futures_core::future::Future) where the
396     /// [`Output`](futures_core::future::Future::Output) type is
397     /// `Result<(), Self::Error>`. If any of the intermediate
398     /// futures or the stream returns an error, this future will return
399     /// immediately with an error.
400     ///
401     /// # Examples
402     ///
403     /// ```
404     /// # futures::executor::block_on(async {
405     /// use futures::future;
406     /// use futures::stream::{self, TryStreamExt};
407     ///
408     /// let mut x = 0i32;
409     ///
410     /// {
411     ///     let fut = stream::repeat(Ok(1)).try_for_each(|item| {
412     ///         x += item;
413     ///         future::ready(if x == 3 { Err(()) } else { Ok(()) })
414     ///     });
415     ///     assert_eq!(fut.await, Err(()));
416     /// }
417     ///
418     /// assert_eq!(x, 3);
419     /// # })
420     /// ```
try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: TryFuture<Ok = (), Error = Self::Error>, Self: Sized,421     fn try_for_each<Fut, F>(self, f: F) -> TryForEach<Self, Fut, F>
422     where
423         F: FnMut(Self::Ok) -> Fut,
424         Fut: TryFuture<Ok = (), Error = Self::Error>,
425         Self: Sized,
426     {
427         assert_future::<Result<(), Self::Error>, _>(TryForEach::new(self, f))
428     }
429 
430     /// Skip elements on this stream while the provided asynchronous predicate
431     /// resolves to `true`.
432     ///
433     /// This function is similar to
434     /// [`StreamExt::skip_while`](crate::stream::StreamExt::skip_while) but exits
435     /// early if an error occurs.
436     ///
437     /// # Examples
438     ///
439     /// ```
440     /// # futures::executor::block_on(async {
441     /// use futures::future;
442     /// use futures::stream::{self, TryStreamExt};
443     ///
444     /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(3), Ok(2)]);
445     /// let stream = stream.try_skip_while(|x| future::ready(Ok(*x < 3)));
446     ///
447     /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
448     /// assert_eq!(output, Ok(vec![3, 2]));
449     /// # })
450     /// ```
try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F> where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,451     fn try_skip_while<Fut, F>(self, f: F) -> TrySkipWhile<Self, Fut, F>
452     where
453         F: FnMut(&Self::Ok) -> Fut,
454         Fut: TryFuture<Ok = bool, Error = Self::Error>,
455         Self: Sized,
456     {
457         assert_stream::<Result<Self::Ok, Self::Error>, _>(TrySkipWhile::new(self, f))
458     }
459 
460     /// Take elements on this stream while the provided asynchronous predicate
461     /// resolves to `true`.
462     ///
463     /// This function is similar to
464     /// [`StreamExt::take_while`](crate::stream::StreamExt::take_while) but exits
465     /// early if an error occurs.
466     ///
467     /// # Examples
468     ///
469     /// ```
470     /// # futures::executor::block_on(async {
471     /// use futures::future;
472     /// use futures::stream::{self, TryStreamExt};
473     ///
474     /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Ok(2)]);
475     /// let stream = stream.try_take_while(|x| future::ready(Ok(*x < 3)));
476     ///
477     /// let output: Result<Vec<i32>, i32> = stream.try_collect().await;
478     /// assert_eq!(output, Ok(vec![1, 2]));
479     /// # })
480     /// ```
try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F> where F: FnMut(&Self::Ok) -> Fut, Fut: TryFuture<Ok = bool, Error = Self::Error>, Self: Sized,481     fn try_take_while<Fut, F>(self, f: F) -> TryTakeWhile<Self, Fut, F>
482     where
483         F: FnMut(&Self::Ok) -> Fut,
484         Fut: TryFuture<Ok = bool, Error = Self::Error>,
485         Self: Sized,
486     {
487         assert_stream::<Result<Self::Ok, Self::Error>, _>(TryTakeWhile::new(self, f))
488     }
489 
490     /// Attempts to run this stream to completion, executing the provided asynchronous
491     /// closure for each element on the stream concurrently as elements become
492     /// available, exiting as soon as an error occurs.
493     ///
494     /// This is similar to
495     /// [`StreamExt::for_each_concurrent`](crate::stream::StreamExt::for_each_concurrent),
496     /// but will resolve to an error immediately if the underlying stream or the provided
497     /// closure return an error.
498     ///
499     /// This method is only available when the `std` or `alloc` feature of this
500     /// library is activated, and it is activated by default.
501     ///
502     /// # Examples
503     ///
504     /// ```
505     /// # futures::executor::block_on(async {
506     /// use futures::channel::oneshot;
507     /// use futures::stream::{self, StreamExt, TryStreamExt};
508     ///
509     /// let (tx1, rx1) = oneshot::channel();
510     /// let (tx2, rx2) = oneshot::channel();
511     /// let (_tx3, rx3) = oneshot::channel();
512     ///
513     /// let stream = stream::iter(vec![rx1, rx2, rx3]);
514     /// let fut = stream.map(Ok).try_for_each_concurrent(
515     ///     /* limit */ 2,
516     ///     |rx| async move {
517     ///         let res: Result<(), oneshot::Canceled> = rx.await;
518     ///         res
519     ///     }
520     /// );
521     ///
522     /// tx1.send(()).unwrap();
523     /// // Drop the second sender so that `rx2` resolves to `Canceled`.
524     /// drop(tx2);
525     ///
526     /// // The final result is an error because the second future
527     /// // resulted in an error.
528     /// assert_eq!(Err(oneshot::Canceled), fut.await);
529     /// # })
530     /// ```
531     #[cfg(not(futures_no_atomic_cas))]
532     #[cfg(feature = "alloc")]
try_for_each_concurrent<Fut, F>( self, limit: impl Into<Option<usize>>, f: F, ) -> TryForEachConcurrent<Self, Fut, F> where F: FnMut(Self::Ok) -> Fut, Fut: Future<Output = Result<(), Self::Error>>, Self: Sized,533     fn try_for_each_concurrent<Fut, F>(
534         self,
535         limit: impl Into<Option<usize>>,
536         f: F,
537     ) -> TryForEachConcurrent<Self, Fut, F>
538     where
539         F: FnMut(Self::Ok) -> Fut,
540         Fut: Future<Output = Result<(), Self::Error>>,
541         Self: Sized,
542     {
543         assert_future::<Result<(), Self::Error>, _>(TryForEachConcurrent::new(
544             self,
545             limit.into(),
546             f,
547         ))
548     }
549 
550     /// Attempt to transform a stream into a collection,
551     /// returning a future representing the result of that computation.
552     ///
553     /// This combinator will collect all successful results of this stream and
554     /// collect them into the specified collection type. If an error happens then all
555     /// collected elements will be dropped and the error will be returned.
556     ///
557     /// The returned future will be resolved when the stream terminates.
558     ///
559     /// # Examples
560     ///
561     /// ```
562     /// # futures::executor::block_on(async {
563     /// use futures::channel::mpsc;
564     /// use futures::stream::TryStreamExt;
565     /// use std::thread;
566     ///
567     /// let (tx, rx) = mpsc::unbounded();
568     ///
569     /// thread::spawn(move || {
570     ///     for i in 1..=5 {
571     ///         tx.unbounded_send(Ok(i)).unwrap();
572     ///     }
573     ///     tx.unbounded_send(Err(6)).unwrap();
574     /// });
575     ///
576     /// let output: Result<Vec<i32>, i32> = rx.try_collect().await;
577     /// assert_eq!(output, Err(6));
578     /// # })
579     /// ```
try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C> where Self: Sized,580     fn try_collect<C: Default + Extend<Self::Ok>>(self) -> TryCollect<Self, C>
581     where
582         Self: Sized,
583     {
584         assert_future::<Result<C, Self::Error>, _>(TryCollect::new(self))
585     }
586 
587     /// An adaptor for chunking up successful items of the stream inside a vector.
588     ///
589     /// This combinator will attempt to pull successful items from this stream and buffer
590     /// them into a local vector. At most `capacity` items will get buffered
591     /// before they're yielded from the returned stream.
592     ///
593     /// Note that the vectors returned from this iterator may not always have
594     /// `capacity` elements. If the underlying stream ended and only a partial
595     /// vector was created, it'll be returned. Additionally if an error happens
596     /// from the underlying stream then the currently buffered items will be
597     /// yielded.
598     ///
599     /// This method is only available when the `std` or `alloc` feature of this
600     /// library is activated, and it is activated by default.
601     ///
602     /// This function is similar to
603     /// [`StreamExt::chunks`](crate::stream::StreamExt::chunks) but exits
604     /// early if an error occurs.
605     ///
606     /// # Examples
607     ///
608     /// ```
609     /// # futures::executor::block_on(async {
610     /// use futures::stream::{self, TryChunksError, TryStreamExt};
611     ///
612     /// let stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2), Ok(3), Err(4), Ok(5), Ok(6)]);
613     /// let mut stream = stream.try_chunks(2);
614     ///
615     /// assert_eq!(stream.try_next().await, Ok(Some(vec![1, 2])));
616     /// assert_eq!(stream.try_next().await, Err(TryChunksError(vec![3], 4)));
617     /// assert_eq!(stream.try_next().await, Ok(Some(vec![5, 6])));
618     /// # })
619     /// ```
620     ///
621     /// # Panics
622     ///
623     /// This method will panic if `capacity` is zero.
624     #[cfg(feature = "alloc")]
try_chunks(self, capacity: usize) -> TryChunks<Self> where Self: Sized,625     fn try_chunks(self, capacity: usize) -> TryChunks<Self>
626     where
627         Self: Sized,
628     {
629         assert_stream::<Result<Vec<Self::Ok>, TryChunksError<Self::Ok, Self::Error>>, _>(
630             TryChunks::new(self, capacity),
631         )
632     }
633 
634     /// Attempt to filter the values produced by this stream according to the
635     /// provided asynchronous closure.
636     ///
637     /// As values of this stream are made available, the provided predicate `f`
638     /// will be run on them. If the predicate returns a `Future` which resolves
639     /// to `true`, then the stream will yield the value, but if the predicate
640     /// return a `Future` which resolves to `false`, then the value will be
641     /// discarded and the next value will be produced.
642     ///
643     /// All errors are passed through without filtering in this combinator.
644     ///
645     /// Note that this function consumes the stream passed into it and returns a
646     /// wrapped version of it, similar to the existing `filter` methods in
647     /// the standard library.
648     ///
649     /// # Examples
650     /// ```
651     /// # futures::executor::block_on(async {
652     /// use futures::future;
653     /// use futures::stream::{self, StreamExt, TryStreamExt};
654     ///
655     /// let stream = stream::iter(vec![Ok(1i32), Ok(2i32), Ok(3i32), Err("error")]);
656     /// let mut evens = stream.try_filter(|x| {
657     ///     future::ready(x % 2 == 0)
658     /// });
659     ///
660     /// assert_eq!(evens.next().await, Some(Ok(2)));
661     /// assert_eq!(evens.next().await, Some(Err("error")));
662     /// # })
663     /// ```
try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F> where Fut: Future<Output = bool>, F: FnMut(&Self::Ok) -> Fut, Self: Sized,664     fn try_filter<Fut, F>(self, f: F) -> TryFilter<Self, Fut, F>
665     where
666         Fut: Future<Output = bool>,
667         F: FnMut(&Self::Ok) -> Fut,
668         Self: Sized,
669     {
670         assert_stream::<Result<Self::Ok, Self::Error>, _>(TryFilter::new(self, f))
671     }
672 
673     /// Attempt to filter the values produced by this stream while
674     /// simultaneously mapping them to a different type according to the
675     /// provided asynchronous closure.
676     ///
677     /// As values of this stream are made available, the provided function will
678     /// be run on them. If the future returned by the predicate `f` resolves to
679     /// [`Some(item)`](Some) then the stream will yield the value `item`, but if
680     /// it resolves to [`None`] then the next value will be produced.
681     ///
682     /// All errors are passed through without filtering in this combinator.
683     ///
684     /// Note that this function consumes the stream passed into it and returns a
685     /// wrapped version of it, similar to the existing `filter_map` methods in
686     /// the standard library.
687     ///
688     /// # Examples
689     /// ```
690     /// # futures::executor::block_on(async {
691     /// use futures::stream::{self, StreamExt, TryStreamExt};
692     /// use futures::pin_mut;
693     ///
694     /// let stream = stream::iter(vec![Ok(1i32), Ok(6i32), Err("error")]);
695     /// let halves = stream.try_filter_map(|x| async move {
696     ///     let ret = if x % 2 == 0 { Some(x / 2) } else { None };
697     ///     Ok(ret)
698     /// });
699     ///
700     /// pin_mut!(halves);
701     /// assert_eq!(halves.next().await, Some(Ok(3)));
702     /// assert_eq!(halves.next().await, Some(Err("error")));
703     /// # })
704     /// ```
try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F> where Fut: TryFuture<Ok = Option<T>, Error = Self::Error>, F: FnMut(Self::Ok) -> Fut, Self: Sized,705     fn try_filter_map<Fut, F, T>(self, f: F) -> TryFilterMap<Self, Fut, F>
706     where
707         Fut: TryFuture<Ok = Option<T>, Error = Self::Error>,
708         F: FnMut(Self::Ok) -> Fut,
709         Self: Sized,
710     {
711         assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
712     }
713 
714     /// Flattens a stream of streams into just one continuous stream.
715     ///
716     /// If this stream's elements are themselves streams then this combinator
717     /// will flatten out the entire stream to one long chain of elements. Any
718     /// errors are passed through without looking at them, but otherwise each
719     /// individual stream will get exhausted before moving on to the next.
720     ///
721     /// # Examples
722     ///
723     /// ```
724     /// # futures::executor::block_on(async {
725     /// use futures::channel::mpsc;
726     /// use futures::stream::{StreamExt, TryStreamExt};
727     /// use std::thread;
728     ///
729     /// let (tx1, rx1) = mpsc::unbounded();
730     /// let (tx2, rx2) = mpsc::unbounded();
731     /// let (tx3, rx3) = mpsc::unbounded();
732     ///
733     /// thread::spawn(move || {
734     ///     tx1.unbounded_send(Ok(1)).unwrap();
735     /// });
736     /// thread::spawn(move || {
737     ///     tx2.unbounded_send(Ok(2)).unwrap();
738     ///     tx2.unbounded_send(Err(3)).unwrap();
739     ///     tx2.unbounded_send(Ok(4)).unwrap();
740     /// });
741     /// thread::spawn(move || {
742     ///     tx3.unbounded_send(Ok(rx1)).unwrap();
743     ///     tx3.unbounded_send(Ok(rx2)).unwrap();
744     ///     tx3.unbounded_send(Err(5)).unwrap();
745     /// });
746     ///
747     /// let mut stream = rx3.try_flatten();
748     /// assert_eq!(stream.next().await, Some(Ok(1)));
749     /// assert_eq!(stream.next().await, Some(Ok(2)));
750     /// assert_eq!(stream.next().await, Some(Err(3)));
751     /// assert_eq!(stream.next().await, Some(Ok(4)));
752     /// assert_eq!(stream.next().await, Some(Err(5)));
753     /// assert_eq!(stream.next().await, None);
754     /// # });
755     /// ```
try_flatten(self) -> TryFlatten<Self> where Self::Ok: TryStream, <Self::Ok as TryStream>::Error: From<Self::Error>, Self: Sized,756     fn try_flatten(self) -> TryFlatten<Self>
757     where
758         Self::Ok: TryStream,
759         <Self::Ok as TryStream>::Error: From<Self::Error>,
760         Self: Sized,
761     {
762         assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
763             TryFlatten::new(self),
764         )
765     }
766 
767     /// Attempt to execute an accumulating asynchronous computation over a
768     /// stream, collecting all the values into one final result.
769     ///
770     /// This combinator will accumulate all values returned by this stream
771     /// according to the closure provided. The initial state is also provided to
772     /// this method and then is returned again by each execution of the closure.
773     /// Once the entire stream has been exhausted the returned future will
774     /// resolve to this value.
775     ///
776     /// This method is similar to [`fold`](crate::stream::StreamExt::fold), but will
777     /// exit early if an error is encountered in either the stream or the
778     /// provided closure.
779     ///
780     /// # Examples
781     ///
782     /// ```
783     /// # futures::executor::block_on(async {
784     /// use futures::stream::{self, TryStreamExt};
785     ///
786     /// let number_stream = stream::iter(vec![Ok::<i32, i32>(1), Ok(2)]);
787     /// let sum = number_stream.try_fold(0, |acc, x| async move { Ok(acc + x) });
788     /// assert_eq!(sum.await, Ok(3));
789     ///
790     /// let number_stream_with_err = stream::iter(vec![Ok::<i32, i32>(1), Err(2), Ok(1)]);
791     /// let sum = number_stream_with_err.try_fold(0, |acc, x| async move { Ok(acc + x) });
792     /// assert_eq!(sum.await, Err(2));
793     /// # })
794     /// ```
try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F> where F: FnMut(T, Self::Ok) -> Fut, Fut: TryFuture<Ok = T, Error = Self::Error>, Self: Sized,795     fn try_fold<T, Fut, F>(self, init: T, f: F) -> TryFold<Self, Fut, T, F>
796     where
797         F: FnMut(T, Self::Ok) -> Fut,
798         Fut: TryFuture<Ok = T, Error = Self::Error>,
799         Self: Sized,
800     {
801         assert_future::<Result<T, Self::Error>, _>(TryFold::new(self, f, init))
802     }
803 
804     /// Attempt to concatenate all items of a stream into a single
805     /// extendable destination, returning a future representing the end result.
806     ///
807     /// This combinator will extend the first item with the contents of all
808     /// the subsequent successful results of the stream. If the stream is empty,
809     /// the default value will be returned.
810     ///
811     /// Works with all collections that implement the [`Extend`](std::iter::Extend) trait.
812     ///
813     /// This method is similar to [`concat`](crate::stream::StreamExt::concat), but will
814     /// exit early if an error is encountered in the stream.
815     ///
816     /// # Examples
817     ///
818     /// ```
819     /// # futures::executor::block_on(async {
820     /// use futures::channel::mpsc;
821     /// use futures::stream::TryStreamExt;
822     /// use std::thread;
823     ///
824     /// let (tx, rx) = mpsc::unbounded::<Result<Vec<i32>, ()>>();
825     ///
826     /// thread::spawn(move || {
827     ///     for i in (0..3).rev() {
828     ///         let n = i * 3;
829     ///         tx.unbounded_send(Ok(vec![n + 1, n + 2, n + 3])).unwrap();
830     ///     }
831     /// });
832     ///
833     /// let result = rx.try_concat().await;
834     ///
835     /// assert_eq!(result, Ok(vec![7, 8, 9, 4, 5, 6, 1, 2, 3]));
836     /// # });
837     /// ```
try_concat(self) -> TryConcat<Self> where Self: Sized, Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,838     fn try_concat(self) -> TryConcat<Self>
839     where
840         Self: Sized,
841         Self::Ok: Extend<<<Self as TryStream>::Ok as IntoIterator>::Item> + IntoIterator + Default,
842     {
843         assert_future::<Result<Self::Ok, Self::Error>, _>(TryConcat::new(self))
844     }
845 
846     /// Attempt to execute several futures from a stream concurrently (unordered).
847     ///
848     /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
849     /// that matches the stream's `Error` type.
850     ///
851     /// This adaptor will buffer up to `n` futures and then return their
852     /// outputs in the order in which they complete. If the underlying stream
853     /// returns an error, it will be immediately propagated.
854     ///
855     /// The returned stream will be a stream of results, each containing either
856     /// an error or a future's output. An error can be produced either by the
857     /// underlying stream itself or by one of the futures it yielded.
858     ///
859     /// This method is only available when the `std` or `alloc` feature of this
860     /// library is activated, and it is activated by default.
861     ///
862     /// # Examples
863     ///
864     /// Results are returned in the order of completion:
865     /// ```
866     /// # futures::executor::block_on(async {
867     /// use futures::channel::oneshot;
868     /// use futures::stream::{self, StreamExt, TryStreamExt};
869     ///
870     /// let (send_one, recv_one) = oneshot::channel();
871     /// let (send_two, recv_two) = oneshot::channel();
872     ///
873     /// let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
874     ///
875     /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
876     ///
877     /// send_two.send(2i32)?;
878     /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
879     ///
880     /// send_one.send(1i32)?;
881     /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
882     ///
883     /// assert_eq!(buffered.next().await, None);
884     /// # Ok::<(), i32>(()) }).unwrap();
885     /// ```
886     ///
887     /// Errors from the underlying stream itself are propagated:
888     /// ```
889     /// # futures::executor::block_on(async {
890     /// use futures::channel::mpsc;
891     /// use futures::stream::{StreamExt, TryStreamExt};
892     ///
893     /// let (sink, stream_of_futures) = mpsc::unbounded();
894     /// let mut buffered = stream_of_futures.try_buffer_unordered(10);
895     ///
896     /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
897     /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
898     ///
899     /// sink.unbounded_send(Err("error in the stream"))?;
900     /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
901     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
902     /// ```
903     #[cfg(not(futures_no_atomic_cas))]
904     #[cfg(feature = "alloc")]
try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self> where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,905     fn try_buffer_unordered(self, n: usize) -> TryBufferUnordered<Self>
906     where
907         Self::Ok: TryFuture<Error = Self::Error>,
908         Self: Sized,
909     {
910         assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(
911             TryBufferUnordered::new(self, n),
912         )
913     }
914 
915     /// Attempt to execute several futures from a stream concurrently.
916     ///
917     /// This stream's `Ok` type must be a [`TryFuture`](futures_core::future::TryFuture) with an `Error` type
918     /// that matches the stream's `Error` type.
919     ///
920     /// This adaptor will buffer up to `n` futures and then return their
921     /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
922     /// be immediately propagated.
923     ///
924     /// The returned stream will be a stream of results, each containing either
925     /// an error or a future's output. An error can be produced either by the
926     /// underlying stream itself or by one of the futures it yielded.
927     ///
928     /// This method is only available when the `std` or `alloc` feature of this
929     /// library is activated, and it is activated by default.
930     ///
931     /// # Examples
932     ///
933     /// Results are returned in the order of addition:
934     /// ```
935     /// # futures::executor::block_on(async {
936     /// use futures::channel::oneshot;
937     /// use futures::future::lazy;
938     /// use futures::stream::{self, StreamExt, TryStreamExt};
939     ///
940     /// let (send_one, recv_one) = oneshot::channel();
941     /// let (send_two, recv_two) = oneshot::channel();
942     ///
943     /// let mut buffered = lazy(move |cx| {
944     ///     let stream_of_futures = stream::iter(vec![Ok(recv_one), Ok(recv_two)]);
945     ///
946     ///     let mut buffered = stream_of_futures.try_buffered(10);
947     ///
948     ///     assert!(buffered.try_poll_next_unpin(cx).is_pending());
949     ///
950     ///     send_two.send(2i32)?;
951     ///     assert!(buffered.try_poll_next_unpin(cx).is_pending());
952     ///     Ok::<_, i32>(buffered)
953     /// }).await?;
954     ///
955     /// send_one.send(1i32)?;
956     /// assert_eq!(buffered.next().await, Some(Ok(1i32)));
957     /// assert_eq!(buffered.next().await, Some(Ok(2i32)));
958     ///
959     /// assert_eq!(buffered.next().await, None);
960     /// # Ok::<(), i32>(()) }).unwrap();
961     /// ```
962     ///
963     /// Errors from the underlying stream itself are propagated:
964     /// ```
965     /// # futures::executor::block_on(async {
966     /// use futures::channel::mpsc;
967     /// use futures::stream::{StreamExt, TryStreamExt};
968     ///
969     /// let (sink, stream_of_futures) = mpsc::unbounded();
970     /// let mut buffered = stream_of_futures.try_buffered(10);
971     ///
972     /// sink.unbounded_send(Ok(async { Ok(7i32) }))?;
973     /// assert_eq!(buffered.next().await, Some(Ok(7i32)));
974     ///
975     /// sink.unbounded_send(Err("error in the stream"))?;
976     /// assert_eq!(buffered.next().await, Some(Err("error in the stream")));
977     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
978     /// ```
979     #[cfg(not(futures_no_atomic_cas))]
980     #[cfg(feature = "alloc")]
try_buffered(self, n: usize) -> TryBuffered<Self> where Self::Ok: TryFuture<Error = Self::Error>, Self: Sized,981     fn try_buffered(self, n: usize) -> TryBuffered<Self>
982     where
983         Self::Ok: TryFuture<Error = Self::Error>,
984         Self: Sized,
985     {
986         assert_stream::<Result<<Self::Ok as TryFuture>::Ok, Self::Error>, _>(TryBuffered::new(
987             self, n,
988         ))
989     }
990 
991     // TODO: false positive warning from rustdoc. Verify once #43466 settles
992     //
993     /// A convenience method for calling [`TryStream::try_poll_next`] on [`Unpin`]
994     /// stream types.
try_poll_next_unpin( &mut self, cx: &mut Context<'_>, ) -> Poll<Option<Result<Self::Ok, Self::Error>>> where Self: Unpin,995     fn try_poll_next_unpin(
996         &mut self,
997         cx: &mut Context<'_>,
998     ) -> Poll<Option<Result<Self::Ok, Self::Error>>>
999     where
1000         Self: Unpin,
1001     {
1002         Pin::new(self).try_poll_next(cx)
1003     }
1004 
1005     /// Wraps a [`TryStream`] into a stream compatible with libraries using
1006     /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
1007     /// ```
1008     /// # if cfg!(miri) { return; } // Miri does not support epoll
1009     /// use futures::future::{FutureExt, TryFutureExt};
1010     /// # let (tx, rx) = futures::channel::oneshot::channel();
1011     ///
1012     /// let future03 = async {
1013     ///     println!("Running on the pool");
1014     ///     tx.send(42).unwrap();
1015     /// };
1016     ///
1017     /// let future01 = future03
1018     ///     .unit_error() // Make it a TryFuture
1019     ///     .boxed()  // Make it Unpin
1020     ///     .compat();
1021     ///
1022     /// tokio::run(future01);
1023     /// # assert_eq!(42, futures::executor::block_on(rx).unwrap());
1024     /// ```
1025     #[cfg(feature = "compat")]
1026     #[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
compat(self) -> Compat<Self> where Self: Sized + Unpin,1027     fn compat(self) -> Compat<Self>
1028     where
1029         Self: Sized + Unpin,
1030     {
1031         Compat::new(self)
1032     }
1033 
1034     /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
1035     ///
1036     /// This method is only available when the `std` feature of this
1037     /// library is activated, and it is activated by default.
1038     ///
1039     /// # Examples
1040     ///
1041     /// ```
1042     /// # futures::executor::block_on(async {
1043     /// use futures::stream::{self, TryStreamExt};
1044     /// use futures::io::AsyncReadExt;
1045     ///
1046     /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
1047     /// let mut reader = stream.into_async_read();
1048     ///
1049     /// let mut buf = Vec::new();
1050     /// reader.read_to_end(&mut buf).await.unwrap();
1051     /// assert_eq!(buf, [1, 2, 3, 4, 5]);
1052     /// # })
1053     /// ```
1054     #[cfg(feature = "io")]
1055     #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
1056     #[cfg(feature = "std")]
into_async_read(self) -> IntoAsyncRead<Self> where Self: Sized + TryStreamExt<Error = std::io::Error>, Self::Ok: AsRef<[u8]>,1057     fn into_async_read(self) -> IntoAsyncRead<Self>
1058     where
1059         Self: Sized + TryStreamExt<Error = std::io::Error>,
1060         Self::Ok: AsRef<[u8]>,
1061     {
1062         crate::io::assert_read(IntoAsyncRead::new(self))
1063     }
1064 }
1065