1# Asynchronous streams for Rust 2 3Asynchronous stream of elements. 4 5Provides two macros, `stream!` and `try_stream!`, allowing the caller to 6define asynchronous streams of elements. These are implemented using `async` 7& `await` notation. The `stream!` macro works without unstable features. 8 9The `stream!` macro returns an anonymous type implementing the [`Stream`] 10trait. The `Item` associated type is the type of the values yielded from the 11stream. The `try_stream!` also returns an anonymous type implementing the 12[`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The 13`try_stream!` macro supports using `?` notiation as part of the 14implementation. 15 16## Usage 17 18A basic stream yielding numbers. Values are yielded using the `yield` 19keyword. The stream block must return `()`. 20 21```rust 22use async_stream::stream; 23 24use futures_util::pin_mut; 25use futures_util::stream::StreamExt; 26 27#[tokio::main] 28async fn main() { 29 let s = stream! { 30 for i in 0..3 { 31 yield i; 32 } 33 }; 34 35 pin_mut!(s); // needed for iteration 36 37 while let Some(value) = s.next().await { 38 println!("got {}", value); 39 } 40} 41``` 42 43Streams may be returned by using `impl Stream<Item = T>`: 44 45```rust 46use async_stream::stream; 47 48use futures_core::stream::Stream; 49use futures_util::pin_mut; 50use futures_util::stream::StreamExt; 51 52fn zero_to_three() -> impl Stream<Item = u32> { 53 stream! { 54 for i in 0..3 { 55 yield i; 56 } 57 } 58} 59 60#[tokio::main] 61async fn main() { 62 let s = zero_to_three(); 63 pin_mut!(s); // needed for iteration 64 65 while let Some(value) = s.next().await { 66 println!("got {}", value); 67 } 68} 69``` 70 71Streams may be implemented in terms of other streams: 72 73```rust 74use async_stream::stream; 75 76use futures_core::stream::Stream; 77use futures_util::pin_mut; 78use futures_util::stream::StreamExt; 79 80fn zero_to_three() -> impl Stream<Item = u32> { 81 stream! { 82 for i in 0..3 { 83 yield i; 84 } 85 } 86} 87 88fn double<S: Stream<Item = u32>>(input: S) 89 -> impl Stream<Item = u32> 90{ 91 stream! { 92 pin_mut!(input); 93 while let Some(value) = input.next().await { 94 yield value * 2; 95 } 96 } 97} 98 99#[tokio::main] 100async fn main() { 101 let s = double(zero_to_three()); 102 pin_mut!(s); // needed for iteration 103 104 while let Some(value) = s.next().await { 105 println!("got {}", value); 106 } 107} 108``` 109 110Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item` 111of the returned stream is `Result` with `Ok` being the value yielded and 112`Err` the error type returned by `?`. 113 114```rust 115use tokio::net::{TcpListener, TcpStream}; 116 117use async_stream::try_stream; 118use futures_core::stream::Stream; 119 120use std::io; 121use std::net::SocketAddr; 122 123fn bind_and_accept(addr: SocketAddr) 124 -> impl Stream<Item = io::Result<TcpStream>> 125{ 126 try_stream! { 127 let mut listener = TcpListener::bind(&addr)?; 128 129 loop { 130 let (stream, addr) = listener.accept().await?; 131 println!("received on {:?}", addr); 132 yield stream; 133 } 134 } 135} 136``` 137 138## Implementation 139 140The `stream!` and `try_stream!` macros are implemented using proc macros. 141Given that proc macros in expression position are not supported on stable 142rust, a hack similar to the one provided by the [`proc-macro-hack`] crate is 143used. The macro searches the syntax tree for instances of `sender.send($expr)` and 144transforms them into `sender.send($expr).await`. 145 146The stream uses a lightweight sender to send values from the stream 147implementation to the caller. When entering the stream, an `Option<T>` is 148stored on the stack. A pointer to the cell is stored in a thread local and 149`poll` is called on the async block. When `poll` returns. 150`sender.send(value)` stores the value that cell and yields back to the 151caller. 152 153## Limitations 154 155`async-stream` suffers from the same limitations as the [`proc-macro-hack`] 156crate. Primarily, nesting support must be implemented using a `TT-muncher`. 157If large `stream!` blocks are used, the caller will be required to add 158`#![recursion_limit = "..."]` to their crate. 159 160A `stream!` macro may only contain up to 64 macro invocations. 161 162[`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html 163[`proc-macro-hack`]: https://github.com/dtolnay/proc-macro-hack/ 164 165## License 166 167This project is licensed under the [MIT license](LICENSE). 168 169### Contribution 170 171Unless you explicitly state otherwise, any contribution intentionally submitted 172for inclusion in `async-stream` by you, shall be licensed as MIT, without any 173additional terms or conditions. 174