#![warn( missing_debug_implementations, missing_docs, rust_2018_idioms, unreachable_pub )] #![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))] //! Asynchronous stream of elements. //! //! Provides two macros, `stream!` and `try_stream!`, allowing the caller to //! define asynchronous streams of elements. These are implemented using `async` //! & `await` notation. This crate works without unstable features. //! //! The `stream!` macro returns an anonymous type implementing the [`Stream`] //! trait. The `Item` associated type is the type of the values yielded from the //! stream. The `try_stream!` also returns an anonymous type implementing the //! [`Stream`] trait, but the `Item` associated type is `Result`. The //! `try_stream!` macro supports using `?` notiation as part of the //! implementation. //! //! # Usage //! //! A basic stream yielding numbers. Values are yielded using the `yield` //! keyword. The stream block must return `()`. //! //! ```rust //! use async_stream::stream; //! //! use futures_util::pin_mut; //! use futures_util::stream::StreamExt; //! //! #[tokio::main] //! async fn main() { //! let s = stream! { //! for i in 0..3 { //! yield i; //! } //! }; //! //! pin_mut!(s); // needed for iteration //! //! while let Some(value) = s.next().await { //! println!("got {}", value); //! } //! } //! ``` //! //! Streams may be returned by using `impl Stream`: //! //! ```rust //! use async_stream::stream; //! //! use futures_core::stream::Stream; //! use futures_util::pin_mut; //! use futures_util::stream::StreamExt; //! //! fn zero_to_three() -> impl Stream { //! stream! { //! for i in 0..3 { //! yield i; //! } //! } //! } //! //! #[tokio::main] //! async fn main() { //! let s = zero_to_three(); //! pin_mut!(s); // needed for iteration //! //! while let Some(value) = s.next().await { //! println!("got {}", value); //! } //! } //! ``` //! //! Streams may be implemented in terms of other streams - `async-stream` provides `for await` //! syntax to assist with this: //! //! ```rust //! use async_stream::stream; //! //! use futures_core::stream::Stream; //! use futures_util::pin_mut; //! use futures_util::stream::StreamExt; //! //! fn zero_to_three() -> impl Stream { //! stream! { //! for i in 0..3 { //! yield i; //! } //! } //! } //! //! fn double>(input: S) //! -> impl Stream //! { //! stream! { //! for await value in input { //! yield value * 2; //! } //! } //! } //! //! #[tokio::main] //! async fn main() { //! let s = double(zero_to_three()); //! pin_mut!(s); // needed for iteration //! //! while let Some(value) = s.next().await { //! println!("got {}", value); //! } //! } //! ``` //! //! Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item` //! of the returned stream is `Result` with `Ok` being the value yielded and //! `Err` the error type returned by `?`. //! //! ```rust //! use tokio::net::{TcpListener, TcpStream}; //! //! use async_stream::try_stream; //! use futures_core::stream::Stream; //! //! use std::io; //! use std::net::SocketAddr; //! //! fn bind_and_accept(addr: SocketAddr) //! -> impl Stream> //! { //! try_stream! { //! let mut listener = TcpListener::bind(addr).await?; //! //! loop { //! let (stream, addr) = listener.accept().await?; //! println!("received on {:?}", addr); //! yield stream; //! } //! } //! } //! ``` //! //! # Implementation //! //! The `stream!` and `try_stream!` macros are implemented using proc macros. //! The macro searches the syntax tree for instances of `sender.send($expr)` and //! transforms them into `sender.send($expr).await`. //! //! The stream uses a lightweight sender to send values from the stream //! implementation to the caller. When entering the stream, an `Option` is //! stored on the stack. A pointer to the cell is stored in a thread local and //! `poll` is called on the async block. When `poll` returns. //! `sender.send(value)` stores the value that cell and yields back to the //! caller. //! //! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html mod async_stream; mod next; #[doc(hidden)] pub mod yielder; // Used by the macro, but not intended to be accessed publicly. #[doc(hidden)] pub use crate::async_stream::AsyncStream; #[doc(hidden)] pub use async_stream_impl; /// Asynchronous stream /// /// See [crate](index.html) documentation for more details. /// /// # Examples /// /// ``` /// use async_stream::stream; /// /// use futures_util::pin_mut; /// use futures_util::stream::StreamExt; /// /// #[tokio::main] /// async fn main() { /// let s = stream! { /// for i in 0..3 { /// yield i; /// } /// }; /// /// pin_mut!(s); // needed for iteration /// /// while let Some(value) = s.next().await { /// println!("got {}", value); /// } /// } /// ``` #[macro_export] macro_rules! stream { ($($tt:tt)*) => { $crate::async_stream_impl::stream_inner!(($crate) $($tt)*) } } /// Asynchronous fallible stream /// /// See [crate](index.html) documentation for more details. /// /// # Examples /// /// ``` /// use tokio::net::{TcpListener, TcpStream}; /// /// use async_stream::try_stream; /// use futures_core::stream::Stream; /// /// use std::io; /// use std::net::SocketAddr; /// /// fn bind_and_accept(addr: SocketAddr) /// -> impl Stream> /// { /// try_stream! { /// let mut listener = TcpListener::bind(addr).await?; /// /// loop { /// let (stream, addr) = listener.accept().await?; /// println!("received on {:?}", addr); /// yield stream; /// } /// } /// } /// ``` #[macro_export] macro_rules! try_stream { ($($tt:tt)*) => { $crate::async_stream_impl::try_stream_inner!(($crate) $($tt)*) } } #[doc(hidden)] pub mod reexport { #[doc(hidden)] pub use crate::next::next; }