• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::runtime::blocking::{BlockingTask, NoopSchedule};
2 use crate::runtime::task::{self, JoinHandle};
3 use crate::runtime::{blocking, context, driver, Spawner};
4 use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};
5 
6 use std::future::Future;
7 use std::marker::PhantomData;
8 use std::{error, fmt};
9 
10 /// Handle to the runtime.
11 ///
12 /// The handle is internally reference-counted and can be freely cloned. A handle can be
13 /// obtained using the [`Runtime::handle`] method.
14 ///
15 /// [`Runtime::handle`]: crate::runtime::Runtime::handle()
16 #[derive(Debug, Clone)]
17 pub struct Handle {
18     pub(super) spawner: Spawner,
19 
20     /// Handles to the I/O drivers
21     #[cfg_attr(
22         not(any(feature = "net", feature = "process", all(unix, feature = "signal"))),
23         allow(dead_code)
24     )]
25     pub(super) io_handle: driver::IoHandle,
26 
27     /// Handles to the signal drivers
28     #[cfg_attr(
29         not(any(feature = "signal", all(unix, feature = "process"))),
30         allow(dead_code)
31     )]
32     pub(super) signal_handle: driver::SignalHandle,
33 
34     /// Handles to the time drivers
35     #[cfg_attr(not(feature = "time"), allow(dead_code))]
36     pub(super) time_handle: driver::TimeHandle,
37 
38     /// Source of `Instant::now()`
39     #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))]
40     pub(super) clock: driver::Clock,
41 
42     /// Blocking pool spawner
43     pub(super) blocking_spawner: blocking::Spawner,
44 }
45 
46 /// Runtime context guard.
47 ///
48 /// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits
49 /// the runtime context on drop.
50 ///
51 /// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter
52 #[derive(Debug)]
53 #[must_use = "Creating and dropping a guard does nothing"]
54 pub struct EnterGuard<'a> {
55     _guard: context::EnterGuard,
56     _handle_lifetime: PhantomData<&'a Handle>,
57 }
58 
59 impl Handle {
60     /// Enters the runtime context. This allows you to construct types that must
61     /// have an executor available on creation such as [`Sleep`] or [`TcpStream`].
62     /// It will also allow you to call methods such as [`tokio::spawn`].
63     ///
64     /// [`Sleep`]: struct@crate::time::Sleep
65     /// [`TcpStream`]: struct@crate::net::TcpStream
66     /// [`tokio::spawn`]: fn@crate::spawn
enter(&self) -> EnterGuard<'_>67     pub fn enter(&self) -> EnterGuard<'_> {
68         EnterGuard {
69             _guard: context::enter(self.clone()),
70             _handle_lifetime: PhantomData,
71         }
72     }
73 
74     /// Returns a `Handle` view over the currently running `Runtime`.
75     ///
76     /// # Panic
77     ///
78     /// This will panic if called outside the context of a Tokio runtime. That means that you must
79     /// call this on one of the threads **being run by the runtime**. Calling this from within a
80     /// thread created by `std::thread::spawn` (for example) will cause a panic.
81     ///
82     /// # Examples
83     ///
84     /// This can be used to obtain the handle of the surrounding runtime from an async
85     /// block or function running on that runtime.
86     ///
87     /// ```
88     /// # use std::thread;
89     /// # use tokio::runtime::Runtime;
90     /// # fn dox() {
91     /// # let rt = Runtime::new().unwrap();
92     /// # rt.spawn(async {
93     /// use tokio::runtime::Handle;
94     ///
95     /// // Inside an async block or function.
96     /// let handle = Handle::current();
97     /// handle.spawn(async {
98     ///     println!("now running in the existing Runtime");
99     /// });
100     ///
101     /// # let handle =
102     /// thread::spawn(move || {
103     ///     // Notice that the handle is created outside of this thread and then moved in
104     ///     handle.spawn(async { /* ... */ })
105     ///     // This next line would cause a panic
106     ///     // let handle2 = Handle::current();
107     /// });
108     /// # handle.join().unwrap();
109     /// # });
110     /// # }
111     /// ```
current() -> Self112     pub fn current() -> Self {
113         context::current()
114     }
115 
116     /// Returns a Handle view over the currently running Runtime
117     ///
118     /// Returns an error if no Runtime has been started
119     ///
120     /// Contrary to `current`, this never panics
try_current() -> Result<Self, TryCurrentError>121     pub fn try_current() -> Result<Self, TryCurrentError> {
122         context::try_current()
123     }
124 
125     cfg_stats! {
126         /// Returns a view that lets you get information about how the runtime
127         /// is performing.
128         pub fn stats(&self) -> &crate::runtime::stats::RuntimeStats {
129             self.spawner.stats()
130         }
131     }
132 
133     /// Spawns a future onto the Tokio runtime.
134     ///
135     /// This spawns the given future onto the runtime's executor, usually a
136     /// thread pool. The thread pool is then responsible for polling the future
137     /// until it completes.
138     ///
139     /// See [module level][mod] documentation for more details.
140     ///
141     /// [mod]: index.html
142     ///
143     /// # Examples
144     ///
145     /// ```
146     /// use tokio::runtime::Runtime;
147     ///
148     /// # fn dox() {
149     /// // Create the runtime
150     /// let rt = Runtime::new().unwrap();
151     /// // Get a handle from this runtime
152     /// let handle = rt.handle();
153     ///
154     /// // Spawn a future onto the runtime using the handle
155     /// handle.spawn(async {
156     ///     println!("now running on a worker thread");
157     /// });
158     /// # }
159     /// ```
160     #[cfg_attr(tokio_track_caller, track_caller)]
spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,161     pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
162     where
163         F: Future + Send + 'static,
164         F::Output: Send + 'static,
165     {
166         #[cfg(all(tokio_unstable, feature = "tracing"))]
167         let future = crate::util::trace::task(future, "task", None);
168         self.spawner.spawn(future)
169     }
170 
171     /// Runs the provided function on an executor dedicated to blocking.
172     /// operations.
173     ///
174     /// # Examples
175     ///
176     /// ```
177     /// use tokio::runtime::Runtime;
178     ///
179     /// # fn dox() {
180     /// // Create the runtime
181     /// let rt = Runtime::new().unwrap();
182     /// // Get a handle from this runtime
183     /// let handle = rt.handle();
184     ///
185     /// // Spawn a blocking function onto the runtime using the handle
186     /// handle.spawn_blocking(|| {
187     ///     println!("now running on a worker thread");
188     /// });
189     /// # }
190     #[cfg_attr(tokio_track_caller, track_caller)]
spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,191     pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
192     where
193         F: FnOnce() -> R + Send + 'static,
194         R: Send + 'static,
195     {
196         if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 {
197             self.spawn_blocking_inner(Box::new(func), None)
198         } else {
199             self.spawn_blocking_inner(func, None)
200         }
201     }
202 
203     #[cfg_attr(tokio_track_caller, track_caller)]
spawn_blocking_inner<F, R>(&self, func: F, name: Option<&str>) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,204     pub(crate) fn spawn_blocking_inner<F, R>(&self, func: F, name: Option<&str>) -> JoinHandle<R>
205     where
206         F: FnOnce() -> R + Send + 'static,
207         R: Send + 'static,
208     {
209         let fut = BlockingTask::new(func);
210 
211         #[cfg(all(tokio_unstable, feature = "tracing"))]
212         let fut = {
213             use tracing::Instrument;
214             #[cfg(tokio_track_caller)]
215             let location = std::panic::Location::caller();
216             #[cfg(tokio_track_caller)]
217             let span = tracing::trace_span!(
218                 target: "tokio::task::blocking",
219                 "runtime.spawn",
220                 kind = %"blocking",
221                 task.name = %name.unwrap_or_default(),
222                 "fn" = %std::any::type_name::<F>(),
223                 spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()),
224             );
225             #[cfg(not(tokio_track_caller))]
226             let span = tracing::trace_span!(
227                 target: "tokio::task::blocking",
228                 "runtime.spawn",
229                 kind = %"blocking",
230                 task.name = %name.unwrap_or_default(),
231                 "fn" = %std::any::type_name::<F>(),
232             );
233             fut.instrument(span)
234         };
235 
236         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
237         let _ = name;
238 
239         let (task, handle) = task::unowned(fut, NoopSchedule);
240         let _ = self.blocking_spawner.spawn(task, self);
241         handle
242     }
243 
244     /// Runs a future to completion on this `Handle`'s associated `Runtime`.
245     ///
246     /// This runs the given future on the current thread, blocking until it is
247     /// complete, and yielding its resolved result. Any tasks or timers which
248     /// the future spawns internally will be executed on the runtime.
249     ///
250     /// When this is used on a `current_thread` runtime, only the
251     /// [`Runtime::block_on`] method can drive the IO and timer drivers, but the
252     /// `Handle::block_on` method cannot drive them. This means that, when using
253     /// this method on a current_thread runtime, anything that relies on IO or
254     /// timers will not work unless there is another thread currently calling
255     /// [`Runtime::block_on`] on the same runtime.
256     ///
257     /// # If the runtime has been shut down
258     ///
259     /// If the `Handle`'s associated `Runtime` has been shut down (through
260     /// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by
261     /// dropping it) and `Handle::block_on` is used it might return an error or
262     /// panic. Specifically IO resources will return an error and timers will
263     /// panic. Runtime independent futures will run as normal.
264     ///
265     /// # Panics
266     ///
267     /// This function panics if the provided future panics, if called within an
268     /// asynchronous execution context, or if a timer future is executed on a
269     /// runtime that has been shut down.
270     ///
271     /// # Examples
272     ///
273     /// ```
274     /// use tokio::runtime::Runtime;
275     ///
276     /// // Create the runtime
277     /// let rt  = Runtime::new().unwrap();
278     ///
279     /// // Get a handle from this runtime
280     /// let handle = rt.handle();
281     ///
282     /// // Execute the future, blocking the current thread until completion
283     /// handle.block_on(async {
284     ///     println!("hello");
285     /// });
286     /// ```
287     ///
288     /// Or using `Handle::current`:
289     ///
290     /// ```
291     /// use tokio::runtime::Handle;
292     ///
293     /// #[tokio::main]
294     /// async fn main () {
295     ///     let handle = Handle::current();
296     ///     std::thread::spawn(move || {
297     ///         // Using Handle::block_on to run async code in the new thread.
298     ///         handle.block_on(async {
299     ///             println!("hello");
300     ///         });
301     ///     });
302     /// }
303     /// ```
304     ///
305     /// [`JoinError`]: struct@crate::task::JoinError
306     /// [`JoinHandle`]: struct@crate::task::JoinHandle
307     /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on
308     /// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background
309     /// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout
310     /// [`spawn_blocking`]: crate::task::spawn_blocking
311     /// [`tokio::fs`]: crate::fs
312     /// [`tokio::net`]: crate::net
313     /// [`tokio::time`]: crate::time
314     #[cfg_attr(tokio_track_caller, track_caller)]
block_on<F: Future>(&self, future: F) -> F::Output315     pub fn block_on<F: Future>(&self, future: F) -> F::Output {
316         #[cfg(all(tokio_unstable, feature = "tracing"))]
317         let future = crate::util::trace::task(future, "block_on", None);
318 
319         // Enter the **runtime** context. This configures spawning, the current I/O driver, ...
320         let _rt_enter = self.enter();
321 
322         // Enter a **blocking** context. This prevents blocking from a runtime.
323         let mut blocking_enter = crate::runtime::enter(true);
324 
325         // Block on the future
326         blocking_enter
327             .block_on(future)
328             .expect("failed to park thread")
329     }
330 
shutdown(mut self)331     pub(crate) fn shutdown(mut self) {
332         self.spawner.shutdown();
333     }
334 }
335 
336 /// Error returned by `try_current` when no Runtime has been started
337 #[derive(Debug)]
338 pub struct TryCurrentError {
339     kind: TryCurrentErrorKind,
340 }
341 
342 impl TryCurrentError {
new_no_context() -> Self343     pub(crate) fn new_no_context() -> Self {
344         Self {
345             kind: TryCurrentErrorKind::NoContext,
346         }
347     }
348 
new_thread_local_destroyed() -> Self349     pub(crate) fn new_thread_local_destroyed() -> Self {
350         Self {
351             kind: TryCurrentErrorKind::ThreadLocalDestroyed,
352         }
353     }
354 
355     /// Returns true if the call failed because there is currently no runtime in
356     /// the Tokio context.
is_missing_context(&self) -> bool357     pub fn is_missing_context(&self) -> bool {
358         matches!(self.kind, TryCurrentErrorKind::NoContext)
359     }
360 
361     /// Returns true if the call failed because the Tokio context thread-local
362     /// had been destroyed. This can usually only happen if in the destructor of
363     /// other thread-locals.
is_thread_local_destroyed(&self) -> bool364     pub fn is_thread_local_destroyed(&self) -> bool {
365         matches!(self.kind, TryCurrentErrorKind::ThreadLocalDestroyed)
366     }
367 }
368 
369 enum TryCurrentErrorKind {
370     NoContext,
371     ThreadLocalDestroyed,
372 }
373 
374 impl fmt::Debug for TryCurrentErrorKind {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result375     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
376         use TryCurrentErrorKind::*;
377         match self {
378             NoContext => f.write_str("NoContext"),
379             ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"),
380         }
381     }
382 }
383 
384 impl fmt::Display for TryCurrentError {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result385     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
386         use TryCurrentErrorKind::*;
387         match self.kind {
388             NoContext => f.write_str(CONTEXT_MISSING_ERROR),
389             ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR),
390         }
391     }
392 }
393 
394 impl error::Error for TryCurrentError {}
395