• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::{Instrument, Instrumented, WithDispatch};
2 use futures_01::{
3     future::{ExecuteError, Executor},
4     Future,
5 };
6 
7 macro_rules! deinstrument_err {
8     ($e:expr) => {
9         $e.map_err(|e| {
10             let kind = e.kind();
11             let future = e.into_future().inner;
12             ExecuteError::new(kind, future)
13         })
14     };
15 }
16 
17 impl<T, F> Executor<F> for Instrumented<T>
18 where
19     T: Executor<Instrumented<F>>,
20     F: Future<Item = (), Error = ()>,
21 {
execute(&self, future: F) -> Result<(), ExecuteError<F>>22     fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
23         let future = future.instrument(self.span.clone());
24         deinstrument_err!(self.inner.execute(future))
25     }
26 }
27 
28 impl<T, F> Executor<F> for WithDispatch<T>
29 where
30     T: Executor<WithDispatch<F>>,
31     F: Future<Item = (), Error = ()>,
32 {
execute(&self, future: F) -> Result<(), ExecuteError<F>>33     fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
34         let future = self.with_dispatch(future);
35         deinstrument_err!(self.inner.execute(future))
36     }
37 }
38 
39 #[cfg(feature = "tokio")]
40 #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
41 pub use self::tokio::*;
42 
43 #[cfg(feature = "tokio")]
44 mod tokio {
45     use crate::{Instrument, Instrumented, WithDispatch};
46     use futures_01::Future;
47     use tokio_01::{
48         executor::{Executor, SpawnError, TypedExecutor},
49         runtime::{current_thread, Runtime, TaskExecutor},
50     };
51 
52     impl<T> Executor for Instrumented<T>
53     where
54         T: Executor,
55     {
spawn( &mut self, future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>, ) -> Result<(), SpawnError>56         fn spawn(
57             &mut self,
58             future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>,
59         ) -> Result<(), SpawnError> {
60             // TODO: get rid of double box somehow?
61             let future = Box::new(future.instrument(self.span.clone()));
62             self.inner.spawn(future)
63         }
64     }
65 
66     impl<T, F> TypedExecutor<F> for Instrumented<T>
67     where
68         T: TypedExecutor<Instrumented<F>>,
69     {
spawn(&mut self, future: F) -> Result<(), SpawnError>70         fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
71             self.inner.spawn(future.instrument(self.span.clone()))
72         }
73 
status(&self) -> Result<(), SpawnError>74         fn status(&self) -> Result<(), SpawnError> {
75             self.inner.status()
76         }
77     }
78 
79     impl Instrumented<Runtime> {
80         /// Spawn an instrumented future onto the Tokio runtime.
81         ///
82         /// This spawns the given future onto the runtime's executor, usually a
83         /// thread pool. The thread pool is then responsible for polling the
84         /// future until it completes.
85         ///
86         /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`,
87         /// instrumenting the spawned future beforehand.
spawn<F>(&mut self, future: F) -> &mut Self where F: Future<Item = (), Error = ()> + Send + 'static,88         pub fn spawn<F>(&mut self, future: F) -> &mut Self
89         where
90             F: Future<Item = (), Error = ()> + Send + 'static,
91         {
92             let future = future.instrument(self.span.clone());
93             self.inner.spawn(future);
94             self
95         }
96 
97         /// Run an instrumented future to completion on the Tokio runtime.
98         ///
99         /// This runs the given future on the runtime, blocking until it is
100         /// complete, and yielding its resolved result. Any tasks or timers which
101         /// the future spawns internally will be executed on the runtime.
102         ///
103         /// This method should not be called from an asynchronous context.
104         ///
105         /// This method simply wraps a call to `tokio::runtime::Runtime::block_on`,
106         /// instrumenting the spawned future beforehand.
107         ///
108         /// # Panics
109         ///
110         /// This function panics if the executor is at capacity, if the provided
111         /// future panics, or if called within an asynchronous execution context.
block_on<F, R, E>(&mut self, future: F) -> Result<R, E> where F: Send + 'static + Future<Item = R, Error = E>, R: Send + 'static, E: Send + 'static,112         pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
113         where
114             F: Send + 'static + Future<Item = R, Error = E>,
115             R: Send + 'static,
116             E: Send + 'static,
117         {
118             let future = future.instrument(self.span.clone());
119             self.inner.block_on(future)
120         }
121 
122         /// Return an instrumented handle to the runtime's executor.
123         ///
124         /// The returned handle can be used to spawn tasks that run on this runtime.
125         ///
126         /// The instrumented handle functions identically to a
127         /// `tokio::runtime::TaskExecutor`, but instruments the spawned
128         /// futures prior to spawning them.
executor(&self) -> Instrumented<TaskExecutor>129         pub fn executor(&self) -> Instrumented<TaskExecutor> {
130             self.inner.executor().instrument(self.span.clone())
131         }
132     }
133 
134     impl Instrumented<current_thread::Runtime> {
135         /// Spawn an instrumented future onto the single-threaded Tokio runtime.
136         ///
137         /// This method simply wraps a call to `current_thread::Runtime::spawn`,
138         /// instrumenting the spawned future beforehand.
spawn<F>(&mut self, future: F) -> &mut Self where F: Future<Item = (), Error = ()> + 'static,139         pub fn spawn<F>(&mut self, future: F) -> &mut Self
140         where
141             F: Future<Item = (), Error = ()> + 'static,
142         {
143             let future = future.instrument(self.span.clone());
144             self.inner.spawn(future);
145             self
146         }
147 
148         /// Instruments and runs the provided future, blocking the current thread
149         /// until the future completes.
150         ///
151         /// This function can be used to synchronously block the current thread
152         /// until the provided `future` has resolved either successfully or with an
153         /// error. The result of the future is then returned from this function
154         /// call.
155         ///
156         /// Note that this function will **also** execute any spawned futures on the
157         /// current thread, but will **not** block until these other spawned futures
158         /// have completed. Once the function returns, any uncompleted futures
159         /// remain pending in the `Runtime` instance. These futures will not run
160         /// until `block_on` or `run` is called again.
161         ///
162         /// The caller is responsible for ensuring that other spawned futures
163         /// complete execution by calling `block_on` or `run`.
164         ///
165         /// This method simply wraps a call to `current_thread::Runtime::block_on`,
166         /// instrumenting the spawned future beforehand.
167         ///
168         /// # Panics
169         ///
170         /// This function panics if the executor is at capacity, if the provided
171         /// future panics, or if called within an asynchronous execution context.
block_on<F, R, E>(&mut self, future: F) -> Result<R, E> where F: 'static + Future<Item = R, Error = E>, R: 'static, E: 'static,172         pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
173         where
174             F: 'static + Future<Item = R, Error = E>,
175             R: 'static,
176             E: 'static,
177         {
178             let future = future.instrument(self.span.clone());
179             self.inner.block_on(future)
180         }
181 
182         /// Get a new instrumented handle to spawn futures on the single-threaded
183         /// Tokio runtime
184         ///
185         /// Different to the runtime itself, the handle can be sent to different
186         /// threads.
187         ///
188         /// The instrumented handle functions identically to a
189         /// `tokio::runtime::current_thread::Handle`, but instruments the spawned
190         /// futures prior to spawning them.
handle(&self) -> Instrumented<current_thread::Handle>191         pub fn handle(&self) -> Instrumented<current_thread::Handle> {
192             self.inner.handle().instrument(self.span.clone())
193         }
194     }
195 
196     impl<T> Executor for WithDispatch<T>
197     where
198         T: Executor,
199     {
spawn( &mut self, future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>, ) -> Result<(), SpawnError>200         fn spawn(
201             &mut self,
202             future: Box<dyn Future<Error = (), Item = ()> + 'static + Send>,
203         ) -> Result<(), SpawnError> {
204             // TODO: get rid of double box?
205             let future = Box::new(self.with_dispatch(future));
206             self.inner.spawn(future)
207         }
208     }
209 
210     impl<T, F> TypedExecutor<F> for WithDispatch<T>
211     where
212         T: TypedExecutor<WithDispatch<F>>,
213     {
spawn(&mut self, future: F) -> Result<(), SpawnError>214         fn spawn(&mut self, future: F) -> Result<(), SpawnError> {
215             self.inner.spawn(self.with_dispatch(future))
216         }
217 
status(&self) -> Result<(), SpawnError>218         fn status(&self) -> Result<(), SpawnError> {
219             self.inner.status()
220         }
221     }
222 
223     impl WithDispatch<Runtime> {
224         /// Spawn a future onto the Tokio runtime, in the context of this
225         /// `WithDispatch`'s trace dispatcher.
226         ///
227         /// This spawns the given future onto the runtime's executor, usually a
228         /// thread pool. The thread pool is then responsible for polling the
229         /// future until it completes.
230         ///
231         /// This method simply wraps a call to `tokio::runtime::Runtime::spawn`,
232         /// instrumenting the spawned future beforehand.
spawn<F>(&mut self, future: F) -> &mut Self where F: Future<Item = (), Error = ()> + Send + 'static,233         pub fn spawn<F>(&mut self, future: F) -> &mut Self
234         where
235             F: Future<Item = (), Error = ()> + Send + 'static,
236         {
237             let future = self.with_dispatch(future);
238             self.inner.spawn(future);
239             self
240         }
241 
242         /// Run a future to completion on the Tokio runtime, in the context of this
243         /// `WithDispatch`'s trace dispatcher.
244         ///
245         /// This runs the given future on the runtime, blocking until it is
246         /// complete, and yielding its resolved result. Any tasks or timers which
247         /// the future spawns internally will be executed on the runtime.
248         ///
249         /// This method should not be called from an asynchronous context.
250         ///
251         /// This method simply wraps a call to `tokio::runtime::Runtime::block_on`,
252         /// instrumenting the spawned future beforehand.
253         ///
254         /// # Panics
255         ///
256         /// This function panics if the executor is at capacity, if the provided
257         /// future panics, or if called within an asynchronous execution context.
block_on<F, R, E>(&mut self, future: F) -> Result<R, E> where F: Send + 'static + Future<Item = R, Error = E>, R: Send + 'static, E: Send + 'static,258         pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
259         where
260             F: Send + 'static + Future<Item = R, Error = E>,
261             R: Send + 'static,
262             E: Send + 'static,
263         {
264             let future = self.with_dispatch(future);
265             self.inner.block_on(future)
266         }
267 
268         /// Return a handle to the runtime's executor, in the context of this
269         /// `WithDispatch`'s trace dispatcher.
270         ///
271         /// The returned handle can be used to spawn tasks that run on this runtime.
272         ///
273         /// The instrumented handle functions identically to a
274         /// `tokio::runtime::TaskExecutor`, but instruments the spawned
275         /// futures prior to spawning them.
executor(&self) -> WithDispatch<TaskExecutor>276         pub fn executor(&self) -> WithDispatch<TaskExecutor> {
277             self.with_dispatch(self.inner.executor())
278         }
279     }
280 
281     impl WithDispatch<current_thread::Runtime> {
282         /// Spawn a future onto the single-threaded Tokio runtime, in the context
283         /// of this `WithDispatch`'s trace dispatcher.
284         ///
285         /// This method simply wraps a call to `current_thread::Runtime::spawn`,
286         /// instrumenting the spawned future beforehand.
spawn<F>(&mut self, future: F) -> &mut Self where F: Future<Item = (), Error = ()> + 'static,287         pub fn spawn<F>(&mut self, future: F) -> &mut Self
288         where
289             F: Future<Item = (), Error = ()> + 'static,
290         {
291             let future = self.with_dispatch(future);
292             self.inner.spawn(future);
293             self
294         }
295 
296         /// Runs the provided future in the context of this `WithDispatch`'s trace
297         /// dispatcher, blocking the current thread until the future completes.
298         ///
299         /// This function can be used to synchronously block the current thread
300         /// until the provided `future` has resolved either successfully or with an
301         /// error. The result of the future is then returned from this function
302         /// call.
303         ///
304         /// Note that this function will **also** execute any spawned futures on the
305         /// current thread, but will **not** block until these other spawned futures
306         /// have completed. Once the function returns, any uncompleted futures
307         /// remain pending in the `Runtime` instance. These futures will not run
308         /// until `block_on` or `run` is called again.
309         ///
310         /// The caller is responsible for ensuring that other spawned futures
311         /// complete execution by calling `block_on` or `run`.
312         ///
313         /// This method simply wraps a call to `current_thread::Runtime::block_on`,
314         /// instrumenting the spawned future beforehand.
315         ///
316         /// # Panics
317         ///
318         /// This function panics if the executor is at capacity, if the provided
319         /// future panics, or if called within an asynchronous execution context.
block_on<F, R, E>(&mut self, future: F) -> Result<R, E> where F: 'static + Future<Item = R, Error = E>, R: 'static, E: 'static,320         pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
321         where
322             F: 'static + Future<Item = R, Error = E>,
323             R: 'static,
324             E: 'static,
325         {
326             let future = self.with_dispatch(future);
327             self.inner.block_on(future)
328         }
329 
330         /// Get a new handle to spawn futures on the single-threaded Tokio runtime,
331         /// in the context of this `WithDispatch`'s trace dispatcher.\
332         ///
333         /// Different to the runtime itself, the handle can be sent to different
334         /// threads.
335         ///
336         /// The instrumented handle functions identically to a
337         /// `tokio::runtime::current_thread::Handle`, but the spawned
338         /// futures are run in the context of the trace dispatcher.
handle(&self) -> WithDispatch<current_thread::Handle>339         pub fn handle(&self) -> WithDispatch<current_thread::Handle> {
340             self.with_dispatch(self.inner.handle())
341         }
342     }
343 }
344