1 use crate::{Instrument, Instrumented, WithDispatch}; 2 use futures_01::{ 3 future::{ExecuteError, Executor}, 4 Future, 5 }; 6 7 macro_rules! deinstrument_err { 8 ($e:expr) => { 9 $e.map_err(|e| { 10 let kind = e.kind(); 11 let future = e.into_future().inner; 12 ExecuteError::new(kind, future) 13 }) 14 }; 15 } 16 17 impl<T, F> Executor<F> for Instrumented<T> 18 where 19 T: Executor<Instrumented<F>>, 20 F: Future<Item = (), Error = ()>, 21 { execute(&self, future: F) -> Result<(), ExecuteError<F>>22 fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { 23 let future = future.instrument(self.span.clone()); 24 deinstrument_err!(self.inner.execute(future)) 25 } 26 } 27 28 impl<T, F> Executor<F> for WithDispatch<T> 29 where 30 T: Executor<WithDispatch<F>>, 31 F: Future<Item = (), Error = ()>, 32 { execute(&self, future: F) -> Result<(), ExecuteError<F>>33 fn execute(&self, future: F) -> Result<(), ExecuteError<F>> { 34 let future = self.with_dispatch(future); 35 deinstrument_err!(self.inner.execute(future)) 36 } 37 } 38 39 #[cfg(feature = "tokio")] 40 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 41 pub use self::tokio::*; 42 43 #[cfg(feature = "tokio")] 44 mod tokio { 45 use crate::{Instrument, Instrumented, WithDispatch}; 46 use futures_01::Future; 47 use tokio_01::{ 48 executor::{Executor, SpawnError, TypedExecutor}, 49 runtime::{current_thread, Runtime, TaskExecutor}, 50 }; 51 52 impl<T> Executor for Instrumented<T> 53 where 54 T: Executor, 55 { spawn( &mut self, future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>, ) -> Result<(), SpawnError>56 fn spawn( 57 &mut self, 58 future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>, 59 ) -> Result<(), SpawnError> { 60 // TODO: get rid of double box somehow? 61 let future = Box::new(future.instrument(self.span.clone())); 62 self.inner.spawn(future) 63 } 64 } 65 66 impl<T, F> TypedExecutor<F> for Instrumented<T> 67 where 68 T: TypedExecutor<Instrumented<F>>, 69 { spawn(&mut self, future: F) -> Result<(), SpawnError>70 fn spawn(&mut self, future: F) -> Result<(), SpawnError> { 71 self.inner.spawn(future.instrument(self.span.clone())) 72 } 73 status(&self) -> Result<(), SpawnError>74 fn status(&self) -> Result<(), SpawnError> { 75 self.inner.status() 76 } 77 } 78 79 impl Instrumented<Runtime> { 80 /// Spawn an instrumented future onto the Tokio runtime. 81 /// 82 /// This spawns the given future onto the runtime's executor, usually a 83 /// thread pool. The thread pool is then responsible for polling the 84 /// future until it completes. 85 /// 86 /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`, 87 /// instrumenting the spawned future beforehand. spawn<F>(&mut self, future: F) -> &mut Self where F: Future<Item = (), Error = ()> + Send + 'static,88 pub fn spawn<F>(&mut self, future: F) -> &mut Self 89 where 90 F: Future<Item = (), Error = ()> + Send + 'static, 91 { 92 let future = future.instrument(self.span.clone()); 93 self.inner.spawn(future); 94 self 95 } 96 97 /// Run an instrumented future to completion on the Tokio runtime. 98 /// 99 /// This runs the given future on the runtime, blocking until it is 100 /// complete, and yielding its resolved result. Any tasks or timers which 101 /// the future spawns internally will be executed on the runtime. 102 /// 103 /// This method should not be called from an asynchronous context. 104 /// 105 /// This method simply wraps a call to `tokio::runtime::Runtime::block_on`, 106 /// instrumenting the spawned future beforehand. 107 /// 108 /// # Panics 109 /// 110 /// This function panics if the executor is at capacity, if the provided 111 /// future panics, or if called within an asynchronous execution context. block_on<F, R, E>(&mut self, future: F) -> Result<R, E> where F: Send + 'static + Future<Item = R, Error = E>, R: Send + 'static, E: Send + 'static,112 pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E> 113 where 114 F: Send + 'static + Future<Item = R, Error = E>, 115 R: Send + 'static, 116 E: Send + 'static, 117 { 118 let future = future.instrument(self.span.clone()); 119 self.inner.block_on(future) 120 } 121 122 /// Return an instrumented handle to the runtime's executor. 123 /// 124 /// The returned handle can be used to spawn tasks that run on this runtime. 125 /// 126 /// The instrumented handle functions identically to a 127 /// `tokio::runtime::TaskExecutor`, but instruments the spawned 128 /// futures prior to spawning them. executor(&self) -> Instrumented<TaskExecutor>129 pub fn executor(&self) -> Instrumented<TaskExecutor> { 130 self.inner.executor().instrument(self.span.clone()) 131 } 132 } 133 134 impl Instrumented<current_thread::Runtime> { 135 /// Spawn an instrumented future onto the single-threaded Tokio runtime. 136 /// 137 /// This method simply wraps a call to `current_thread::Runtime::spawn`, 138 /// instrumenting the spawned future beforehand. spawn<F>(&mut self, future: F) -> &mut Self where F: Future<Item = (), Error = ()> + 'static,139 pub fn spawn<F>(&mut self, future: F) -> &mut Self 140 where 141 F: Future<Item = (), Error = ()> + 'static, 142 { 143 let future = future.instrument(self.span.clone()); 144 self.inner.spawn(future); 145 self 146 } 147 148 /// Instruments and runs the provided future, blocking the current thread 149 /// until the future completes. 150 /// 151 /// This function can be used to synchronously block the current thread 152 /// until the provided `future` has resolved either successfully or with an 153 /// error. The result of the future is then returned from this function 154 /// call. 155 /// 156 /// Note that this function will **also** execute any spawned futures on the 157 /// current thread, but will **not** block until these other spawned futures 158 /// have completed. Once the function returns, any uncompleted futures 159 /// remain pending in the `Runtime` instance. These futures will not run 160 /// until `block_on` or `run` is called again. 161 /// 162 /// The caller is responsible for ensuring that other spawned futures 163 /// complete execution by calling `block_on` or `run`. 164 /// 165 /// This method simply wraps a call to `current_thread::Runtime::block_on`, 166 /// instrumenting the spawned future beforehand. 167 /// 168 /// # Panics 169 /// 170 /// This function panics if the executor is at capacity, if the provided 171 /// future panics, or if called within an asynchronous execution context. block_on<F, R, E>(&mut self, future: F) -> Result<R, E> where F: 'static + Future<Item = R, Error = E>, R: 'static, E: 'static,172 pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E> 173 where 174 F: 'static + Future<Item = R, Error = E>, 175 R: 'static, 176 E: 'static, 177 { 178 let future = future.instrument(self.span.clone()); 179 self.inner.block_on(future) 180 } 181 182 /// Get a new instrumented handle to spawn futures on the single-threaded 183 /// Tokio runtime 184 /// 185 /// Different to the runtime itself, the handle can be sent to different 186 /// threads. 187 /// 188 /// The instrumented handle functions identically to a 189 /// `tokio::runtime::current_thread::Handle`, but instruments the spawned 190 /// futures prior to spawning them. handle(&self) -> Instrumented<current_thread::Handle>191 pub fn handle(&self) -> Instrumented<current_thread::Handle> { 192 self.inner.handle().instrument(self.span.clone()) 193 } 194 } 195 196 impl<T> Executor for WithDispatch<T> 197 where 198 T: Executor, 199 { spawn( &mut self, future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>, ) -> Result<(), SpawnError>200 fn spawn( 201 &mut self, 202 future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>, 203 ) -> Result<(), SpawnError> { 204 // TODO: get rid of double box? 205 let future = Box::new(self.with_dispatch(future)); 206 self.inner.spawn(future) 207 } 208 } 209 210 impl<T, F> TypedExecutor<F> for WithDispatch<T> 211 where 212 T: TypedExecutor<WithDispatch<F>>, 213 { spawn(&mut self, future: F) -> Result<(), SpawnError>214 fn spawn(&mut self, future: F) -> Result<(), SpawnError> { 215 self.inner.spawn(self.with_dispatch(future)) 216 } 217 status(&self) -> Result<(), SpawnError>218 fn status(&self) -> Result<(), SpawnError> { 219 self.inner.status() 220 } 221 } 222 223 impl WithDispatch<Runtime> { 224 /// Spawn a future onto the Tokio runtime, in the context of this 225 /// `WithDispatch`'s trace dispatcher. 226 /// 227 /// This spawns the given future onto the runtime's executor, usually a 228 /// thread pool. The thread pool is then responsible for polling the 229 /// future until it completes. 230 /// 231 /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`, 232 /// instrumenting the spawned future beforehand. spawn<F>(&mut self, future: F) -> &mut Self where F: Future<Item = (), Error = ()> + Send + 'static,233 pub fn spawn<F>(&mut self, future: F) -> &mut Self 234 where 235 F: Future<Item = (), Error = ()> + Send + 'static, 236 { 237 let future = self.with_dispatch(future); 238 self.inner.spawn(future); 239 self 240 } 241 242 /// Run a future to completion on the Tokio runtime, in the context of this 243 /// `WithDispatch`'s trace dispatcher. 244 /// 245 /// This runs the given future on the runtime, blocking until it is 246 /// complete, and yielding its resolved result. Any tasks or timers which 247 /// the future spawns internally will be executed on the runtime. 248 /// 249 /// This method should not be called from an asynchronous context. 250 /// 251 /// This method simply wraps a call to `tokio::runtime::Runtime::block_on`, 252 /// instrumenting the spawned future beforehand. 253 /// 254 /// # Panics 255 /// 256 /// This function panics if the executor is at capacity, if the provided 257 /// future panics, or if called within an asynchronous execution context. block_on<F, R, E>(&mut self, future: F) -> Result<R, E> where F: Send + 'static + Future<Item = R, Error = E>, R: Send + 'static, E: Send + 'static,258 pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E> 259 where 260 F: Send + 'static + Future<Item = R, Error = E>, 261 R: Send + 'static, 262 E: Send + 'static, 263 { 264 let future = self.with_dispatch(future); 265 self.inner.block_on(future) 266 } 267 268 /// Return a handle to the runtime's executor, in the context of this 269 /// `WithDispatch`'s trace dispatcher. 270 /// 271 /// The returned handle can be used to spawn tasks that run on this runtime. 272 /// 273 /// The instrumented handle functions identically to a 274 /// `tokio::runtime::TaskExecutor`, but instruments the spawned 275 /// futures prior to spawning them. executor(&self) -> WithDispatch<TaskExecutor>276 pub fn executor(&self) -> WithDispatch<TaskExecutor> { 277 self.with_dispatch(self.inner.executor()) 278 } 279 } 280 281 impl WithDispatch<current_thread::Runtime> { 282 /// Spawn a future onto the single-threaded Tokio runtime, in the context 283 /// of this `WithDispatch`'s trace dispatcher. 284 /// 285 /// This method simply wraps a call to `current_thread::Runtime::spawn`, 286 /// instrumenting the spawned future beforehand. spawn<F>(&mut self, future: F) -> &mut Self where F: Future<Item = (), Error = ()> + 'static,287 pub fn spawn<F>(&mut self, future: F) -> &mut Self 288 where 289 F: Future<Item = (), Error = ()> + 'static, 290 { 291 let future = self.with_dispatch(future); 292 self.inner.spawn(future); 293 self 294 } 295 296 /// Runs the provided future in the context of this `WithDispatch`'s trace 297 /// dispatcher, blocking the current thread until the future completes. 298 /// 299 /// This function can be used to synchronously block the current thread 300 /// until the provided `future` has resolved either successfully or with an 301 /// error. The result of the future is then returned from this function 302 /// call. 303 /// 304 /// Note that this function will **also** execute any spawned futures on the 305 /// current thread, but will **not** block until these other spawned futures 306 /// have completed. Once the function returns, any uncompleted futures 307 /// remain pending in the `Runtime` instance. These futures will not run 308 /// until `block_on` or `run` is called again. 309 /// 310 /// The caller is responsible for ensuring that other spawned futures 311 /// complete execution by calling `block_on` or `run`. 312 /// 313 /// This method simply wraps a call to `current_thread::Runtime::block_on`, 314 /// instrumenting the spawned future beforehand. 315 /// 316 /// # Panics 317 /// 318 /// This function panics if the executor is at capacity, if the provided 319 /// future panics, or if called within an asynchronous execution context. block_on<F, R, E>(&mut self, future: F) -> Result<R, E> where F: 'static + Future<Item = R, Error = E>, R: 'static, E: 'static,320 pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E> 321 where 322 F: 'static + Future<Item = R, Error = E>, 323 R: 'static, 324 E: 'static, 325 { 326 let future = self.with_dispatch(future); 327 self.inner.block_on(future) 328 } 329 330 /// Get a new handle to spawn futures on the single-threaded Tokio runtime, 331 /// in the context of this `WithDispatch`'s trace dispatcher.\ 332 /// 333 /// Different to the runtime itself, the handle can be sent to different 334 /// threads. 335 /// 336 /// The instrumented handle functions identically to a 337 /// `tokio::runtime::current_thread::Handle`, but the spawned 338 /// futures are run in the context of the trace dispatcher. handle(&self) -> WithDispatch<current_thread::Handle>339 pub fn handle(&self) -> WithDispatch<current_thread::Handle> { 340 self.with_dispatch(self.inner.handle()) 341 } 342 } 343 } 344