1 #![warn( 2 missing_debug_implementations, 3 missing_docs, 4 rust_2018_idioms, 5 unreachable_pub 6 )] 7 #![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))] 8 9 //! Asynchronous stream of elements. 10 //! 11 //! Provides two macros, `stream!` and `try_stream!`, allowing the caller to 12 //! define asynchronous streams of elements. These are implemented using `async` 13 //! & `await` notation. This crate works without unstable features. 14 //! 15 //! The `stream!` macro returns an anonymous type implementing the [`Stream`] 16 //! trait. The `Item` associated type is the type of the values yielded from the 17 //! stream. The `try_stream!` also returns an anonymous type implementing the 18 //! [`Stream`] trait, but the `Item` associated type is `Result<T, Error>`. The 19 //! `try_stream!` macro supports using `?` notiation as part of the 20 //! implementation. 21 //! 22 //! # Usage 23 //! 24 //! A basic stream yielding numbers. Values are yielded using the `yield` 25 //! keyword. The stream block must return `()`. 26 //! 27 //! ```rust 28 //! use async_stream::stream; 29 //! 30 //! use futures_util::pin_mut; 31 //! use futures_util::stream::StreamExt; 32 //! 33 //! #[tokio::main] 34 //! async fn main() { 35 //! let s = stream! { 36 //! for i in 0..3 { 37 //! yield i; 38 //! } 39 //! }; 40 //! 41 //! pin_mut!(s); // needed for iteration 42 //! 43 //! while let Some(value) = s.next().await { 44 //! println!("got {}", value); 45 //! } 46 //! } 47 //! ``` 48 //! 49 //! Streams may be returned by using `impl Stream<Item = T>`: 50 //! 51 //! ```rust 52 //! use async_stream::stream; 53 //! 54 //! use futures_core::stream::Stream; 55 //! use futures_util::pin_mut; 56 //! use futures_util::stream::StreamExt; 57 //! 58 //! fn zero_to_three() -> impl Stream<Item = u32> { 59 //! stream! { 60 //! for i in 0..3 { 61 //! yield i; 62 //! } 63 //! } 64 //! } 65 //! 66 //! #[tokio::main] 67 //! async fn main() { 68 //! let s = zero_to_three(); 69 //! pin_mut!(s); // needed for iteration 70 //! 71 //! while let Some(value) = s.next().await { 72 //! println!("got {}", value); 73 //! } 74 //! } 75 //! ``` 76 //! 77 //! Streams may be implemented in terms of other streams - `async-stream` provides `for await` 78 //! syntax to assist with this: 79 //! 80 //! ```rust 81 //! use async_stream::stream; 82 //! 83 //! use futures_core::stream::Stream; 84 //! use futures_util::pin_mut; 85 //! use futures_util::stream::StreamExt; 86 //! 87 //! fn zero_to_three() -> impl Stream<Item = u32> { 88 //! stream! { 89 //! for i in 0..3 { 90 //! yield i; 91 //! } 92 //! } 93 //! } 94 //! 95 //! fn double<S: Stream<Item = u32>>(input: S) 96 //! -> impl Stream<Item = u32> 97 //! { 98 //! stream! { 99 //! for await value in input { 100 //! yield value * 2; 101 //! } 102 //! } 103 //! } 104 //! 105 //! #[tokio::main] 106 //! async fn main() { 107 //! let s = double(zero_to_three()); 108 //! pin_mut!(s); // needed for iteration 109 //! 110 //! while let Some(value) = s.next().await { 111 //! println!("got {}", value); 112 //! } 113 //! } 114 //! ``` 115 //! 116 //! Rust try notation (`?`) can be used with the `try_stream!` macro. The `Item` 117 //! of the returned stream is `Result` with `Ok` being the value yielded and 118 //! `Err` the error type returned by `?`. 119 //! 120 //! ```rust 121 //! use tokio::net::{TcpListener, TcpStream}; 122 //! 123 //! use async_stream::try_stream; 124 //! use futures_core::stream::Stream; 125 //! 126 //! use std::io; 127 //! use std::net::SocketAddr; 128 //! 129 //! fn bind_and_accept(addr: SocketAddr) 130 //! -> impl Stream<Item = io::Result<TcpStream>> 131 //! { 132 //! try_stream! { 133 //! let mut listener = TcpListener::bind(addr).await?; 134 //! 135 //! loop { 136 //! let (stream, addr) = listener.accept().await?; 137 //! println!("received on {:?}", addr); 138 //! yield stream; 139 //! } 140 //! } 141 //! } 142 //! ``` 143 //! 144 //! # Implementation 145 //! 146 //! The `stream!` and `try_stream!` macros are implemented using proc macros. 147 //! The macro searches the syntax tree for instances of `sender.send($expr)` and 148 //! transforms them into `sender.send($expr).await`. 149 //! 150 //! The stream uses a lightweight sender to send values from the stream 151 //! implementation to the caller. When entering the stream, an `Option<T>` is 152 //! stored on the stack. A pointer to the cell is stored in a thread local and 153 //! `poll` is called on the async block. When `poll` returns. 154 //! `sender.send(value)` stores the value that cell and yields back to the 155 //! caller. 156 //! 157 //! [`Stream`]: https://docs.rs/futures-core/*/futures_core/stream/trait.Stream.html 158 159 mod async_stream; 160 mod next; 161 #[doc(hidden)] 162 pub mod yielder; 163 164 // Used by the macro, but not intended to be accessed publicly. 165 #[doc(hidden)] 166 pub use crate::async_stream::AsyncStream; 167 168 #[doc(hidden)] 169 pub use async_stream_impl; 170 171 /// Asynchronous stream 172 /// 173 /// See [crate](index.html) documentation for more details. 174 /// 175 /// # Examples 176 /// 177 /// ``` 178 /// use async_stream::stream; 179 /// 180 /// use futures_util::pin_mut; 181 /// use futures_util::stream::StreamExt; 182 /// 183 /// #[tokio::main] 184 /// async fn main() { 185 /// let s = stream! { 186 /// for i in 0..3 { 187 /// yield i; 188 /// } 189 /// }; 190 /// 191 /// pin_mut!(s); // needed for iteration 192 /// 193 /// while let Some(value) = s.next().await { 194 /// println!("got {}", value); 195 /// } 196 /// } 197 /// ``` 198 #[macro_export] 199 macro_rules! stream { 200 ($($tt:tt)*) => { 201 $crate::async_stream_impl::stream_inner!(($crate) $($tt)*) 202 } 203 } 204 205 /// Asynchronous fallible stream 206 /// 207 /// See [crate](index.html) documentation for more details. 208 /// 209 /// # Examples 210 /// 211 /// ``` 212 /// use tokio::net::{TcpListener, TcpStream}; 213 /// 214 /// use async_stream::try_stream; 215 /// use futures_core::stream::Stream; 216 /// 217 /// use std::io; 218 /// use std::net::SocketAddr; 219 /// 220 /// fn bind_and_accept(addr: SocketAddr) 221 /// -> impl Stream<Item = io::Result<TcpStream>> 222 /// { 223 /// try_stream! { 224 /// let mut listener = TcpListener::bind(addr).await?; 225 /// 226 /// loop { 227 /// let (stream, addr) = listener.accept().await?; 228 /// println!("received on {:?}", addr); 229 /// yield stream; 230 /// } 231 /// } 232 /// } 233 /// ``` 234 #[macro_export] 235 macro_rules! try_stream { 236 ($($tt:tt)*) => { 237 $crate::async_stream_impl::try_stream_inner!(($crate) $($tt)*) 238 } 239 } 240 241 #[doc(hidden)] 242 pub mod reexport { 243 #[doc(hidden)] 244 pub use crate::next::next; 245 } 246