• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::runtime::blocking::BlockingPool;
2 use crate::runtime::scheduler::CurrentThread;
3 use crate::runtime::{context, EnterGuard, Handle};
4 use crate::task::JoinHandle;
5 
6 use std::future::Future;
7 use std::time::Duration;
8 
9 cfg_rt_multi_thread! {
10     use crate::runtime::Builder;
11     use crate::runtime::scheduler::MultiThread;
12 
13     cfg_unstable! {
14         use crate::runtime::scheduler::MultiThreadAlt;
15     }
16 }
17 
18 /// The Tokio runtime.
19 ///
20 /// The runtime provides an I/O driver, task scheduler, [timer], and
21 /// blocking pool, necessary for running asynchronous tasks.
22 ///
23 /// Instances of `Runtime` can be created using [`new`], or [`Builder`].
24 /// However, most users will use the `#[tokio::main]` annotation on their
25 /// entry point instead.
26 ///
27 /// See [module level][mod] documentation for more details.
28 ///
29 /// # Shutdown
30 ///
31 /// Shutting down the runtime is done by dropping the value, or calling
32 /// [`shutdown_background`] or [`shutdown_timeout`].
33 ///
34 /// Tasks spawned through [`Runtime::spawn`] keep running until they yield.
35 /// Then they are dropped. They are not *guaranteed* to run to completion, but
36 /// *might* do so if they do not yield until completion.
37 ///
38 /// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running
39 /// until they return.
40 ///
41 /// The thread initiating the shutdown blocks until all spawned work has been
42 /// stopped. This can take an indefinite amount of time. The `Drop`
43 /// implementation waits forever for this.
44 ///
45 /// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if
46 /// waiting forever is undesired. When the timeout is reached, spawned work that
47 /// did not stop in time and threads running it are leaked. The work continues
48 /// to run until one of the stopping conditions is fulfilled, but the thread
49 /// initiating the shutdown is unblocked.
50 ///
51 /// Once the runtime has been dropped, any outstanding I/O resources bound to
52 /// it will no longer function. Calling any method on them will result in an
53 /// error.
54 ///
55 /// # Sharing
56 ///
57 /// There are several ways to establish shared access to a Tokio runtime:
58 ///
59 ///  * Using an <code>[Arc]\<Runtime></code>.
60 ///  * Using a [`Handle`].
61 ///  * Entering the runtime context.
62 ///
63 /// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various
64 /// things with the runtime such as spawning new tasks or entering the runtime
65 /// context. Both types can be cloned to create a new handle that allows access
66 /// to the same runtime. By passing clones into different tasks or threads, you
67 /// will be able to access the runtime from those tasks or threads.
68 ///
69 /// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that
70 /// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down,
71 /// whereas a [`Handle`] does not prevent that. This is because shutdown of the
72 /// runtime happens when the destructor of the `Runtime` object runs.
73 ///
74 /// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive
75 /// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>,
76 /// this can be achieved via [`Arc::try_unwrap`] when only one strong count
77 /// reference is left over.
78 ///
79 /// The runtime context is entered using the [`Runtime::enter`] or
80 /// [`Handle::enter`] methods, which use a thread-local variable to store the
81 /// current runtime. Whenever you are inside the runtime context, methods such
82 /// as [`tokio::spawn`] will use the runtime whose context you are inside.
83 ///
84 /// [timer]: crate::time
85 /// [mod]: index.html
86 /// [`new`]: method@Self::new
87 /// [`Builder`]: struct@Builder
88 /// [`Handle`]: struct@Handle
89 /// [`tokio::spawn`]: crate::spawn
90 /// [`Arc::try_unwrap`]: std::sync::Arc::try_unwrap
91 /// [Arc]: std::sync::Arc
92 /// [`shutdown_background`]: method@Runtime::shutdown_background
93 /// [`shutdown_timeout`]: method@Runtime::shutdown_timeout
94 #[derive(Debug)]
95 pub struct Runtime {
96     /// Task scheduler
97     scheduler: Scheduler,
98 
99     /// Handle to runtime, also contains driver handles
100     handle: Handle,
101 
102     /// Blocking pool handle, used to signal shutdown
103     blocking_pool: BlockingPool,
104 }
105 
106 /// The flavor of a `Runtime`.
107 ///
108 /// This is the return type for [`Handle::runtime_flavor`](crate::runtime::Handle::runtime_flavor()).
109 #[derive(Debug, PartialEq, Eq)]
110 #[non_exhaustive]
111 pub enum RuntimeFlavor {
112     /// The flavor that executes all tasks on the current thread.
113     CurrentThread,
114     /// The flavor that executes tasks across multiple threads.
115     MultiThread,
116     /// The flavor that executes tasks across multiple threads.
117     #[cfg(tokio_unstable)]
118     MultiThreadAlt,
119 }
120 
121 /// The runtime scheduler is either a multi-thread or a current-thread executor.
122 #[derive(Debug)]
123 pub(super) enum Scheduler {
124     /// Execute all tasks on the current-thread.
125     CurrentThread(CurrentThread),
126 
127     /// Execute tasks across multiple threads.
128     #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
129     MultiThread(MultiThread),
130 
131     /// Execute tasks across multiple threads.
132     #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
133     MultiThreadAlt(MultiThreadAlt),
134 }
135 
136 impl Runtime {
from_parts( scheduler: Scheduler, handle: Handle, blocking_pool: BlockingPool, ) -> Runtime137     pub(super) fn from_parts(
138         scheduler: Scheduler,
139         handle: Handle,
140         blocking_pool: BlockingPool,
141     ) -> Runtime {
142         Runtime {
143             scheduler,
144             handle,
145             blocking_pool,
146         }
147     }
148 
149     cfg_not_wasi! {
150         /// Creates a new runtime instance with default configuration values.
151         ///
152         /// This results in the multi threaded scheduler, I/O driver, and time driver being
153         /// initialized.
154         ///
155         /// Most applications will not need to call this function directly. Instead,
156         /// they will use the  [`#[tokio::main]` attribute][main]. When a more complex
157         /// configuration is necessary, the [runtime builder] may be used.
158         ///
159         /// See [module level][mod] documentation for more details.
160         ///
161         /// # Examples
162         ///
163         /// Creating a new `Runtime` with default configuration values.
164         ///
165         /// ```
166         /// use tokio::runtime::Runtime;
167         ///
168         /// let rt = Runtime::new()
169         ///     .unwrap();
170         ///
171         /// // Use the runtime...
172         /// ```
173         ///
174         /// [mod]: index.html
175         /// [main]: ../attr.main.html
176         /// [threaded scheduler]: index.html#threaded-scheduler
177         /// [runtime builder]: crate::runtime::Builder
178         #[cfg(feature = "rt-multi-thread")]
179         #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
180         pub fn new() -> std::io::Result<Runtime> {
181             Builder::new_multi_thread().enable_all().build()
182         }
183     }
184 
185     /// Returns a handle to the runtime's spawner.
186     ///
187     /// The returned handle can be used to spawn tasks that run on this runtime, and can
188     /// be cloned to allow moving the `Handle` to other threads.
189     ///
190     /// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone.
191     /// Refer to the documentation of [`Handle::block_on`] for more.
192     ///
193     /// # Examples
194     ///
195     /// ```
196     /// use tokio::runtime::Runtime;
197     ///
198     /// let rt = Runtime::new()
199     ///     .unwrap();
200     ///
201     /// let handle = rt.handle();
202     ///
203     /// // Use the handle...
204     /// ```
handle(&self) -> &Handle205     pub fn handle(&self) -> &Handle {
206         &self.handle
207     }
208 
209     /// Spawns a future onto the Tokio runtime.
210     ///
211     /// This spawns the given future onto the runtime's executor, usually a
212     /// thread pool. The thread pool is then responsible for polling the future
213     /// until it completes.
214     ///
215     /// The provided future will start running in the background immediately
216     /// when `spawn` is called, even if you don't await the returned
217     /// `JoinHandle`.
218     ///
219     /// See [module level][mod] documentation for more details.
220     ///
221     /// [mod]: index.html
222     ///
223     /// # Examples
224     ///
225     /// ```
226     /// use tokio::runtime::Runtime;
227     ///
228     /// # fn dox() {
229     /// // Create the runtime
230     /// let rt = Runtime::new().unwrap();
231     ///
232     /// // Spawn a future onto the runtime
233     /// rt.spawn(async {
234     ///     println!("now running on a worker thread");
235     /// });
236     /// # }
237     /// ```
238     #[track_caller]
spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,239     pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
240     where
241         F: Future + Send + 'static,
242         F::Output: Send + 'static,
243     {
244         self.handle.spawn(future)
245     }
246 
247     /// Runs the provided function on an executor dedicated to blocking operations.
248     ///
249     /// # Examples
250     ///
251     /// ```
252     /// use tokio::runtime::Runtime;
253     ///
254     /// # fn dox() {
255     /// // Create the runtime
256     /// let rt = Runtime::new().unwrap();
257     ///
258     /// // Spawn a blocking function onto the runtime
259     /// rt.spawn_blocking(|| {
260     ///     println!("now running on a worker thread");
261     /// });
262     /// # }
263     #[track_caller]
spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,264     pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
265     where
266         F: FnOnce() -> R + Send + 'static,
267         R: Send + 'static,
268     {
269         self.handle.spawn_blocking(func)
270     }
271 
272     /// Runs a future to completion on the Tokio runtime. This is the
273     /// runtime's entry point.
274     ///
275     /// This runs the given future on the current thread, blocking until it is
276     /// complete, and yielding its resolved result. Any tasks or timers
277     /// which the future spawns internally will be executed on the runtime.
278     ///
279     /// # Non-worker future
280     ///
281     /// Note that the future required by this function does not run as a
282     /// worker. The expectation is that other tasks are spawned by the future here.
283     /// Awaiting on other futures from the future provided here will not
284     /// perform as fast as those spawned as workers.
285     ///
286     /// # Multi thread scheduler
287     ///
288     /// When the multi thread scheduler is used this will allow futures
289     /// to run within the io driver and timer context of the overall runtime.
290     ///
291     /// Any spawned tasks will continue running after `block_on` returns.
292     ///
293     /// # Current thread scheduler
294     ///
295     /// When the current thread scheduler is enabled `block_on`
296     /// can be called concurrently from multiple threads. The first call
297     /// will take ownership of the io and timer drivers. This means
298     /// other threads which do not own the drivers will hook into that one.
299     /// When the first `block_on` completes, other threads will be able to
300     /// "steal" the driver to allow continued execution of their futures.
301     ///
302     /// Any spawned tasks will be suspended after `block_on` returns. Calling
303     /// `block_on` again will resume previously spawned tasks.
304     ///
305     /// # Panics
306     ///
307     /// This function panics if the provided future panics, or if called within an
308     /// asynchronous execution context.
309     ///
310     /// # Examples
311     ///
312     /// ```no_run
313     /// use tokio::runtime::Runtime;
314     ///
315     /// // Create the runtime
316     /// let rt  = Runtime::new().unwrap();
317     ///
318     /// // Execute the future, blocking the current thread until completion
319     /// rt.block_on(async {
320     ///     println!("hello");
321     /// });
322     /// ```
323     ///
324     /// [handle]: fn@Handle::block_on
325     #[track_caller]
block_on<F: Future>(&self, future: F) -> F::Output326     pub fn block_on<F: Future>(&self, future: F) -> F::Output {
327         #[cfg(all(
328             tokio_unstable,
329             tokio_taskdump,
330             feature = "rt",
331             target_os = "linux",
332             any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
333         ))]
334         let future = super::task::trace::Trace::root(future);
335 
336         #[cfg(all(tokio_unstable, feature = "tracing"))]
337         let future = crate::util::trace::task(
338             future,
339             "block_on",
340             None,
341             crate::runtime::task::Id::next().as_u64(),
342         );
343 
344         let _enter = self.enter();
345 
346         match &self.scheduler {
347             Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future),
348             #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
349             Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future),
350             #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
351             Scheduler::MultiThreadAlt(exec) => exec.block_on(&self.handle.inner, future),
352         }
353     }
354 
355     /// Enters the runtime context.
356     ///
357     /// This allows you to construct types that must have an executor
358     /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
359     /// also allow you to call methods such as [`tokio::spawn`].
360     ///
361     /// [`Sleep`]: struct@crate::time::Sleep
362     /// [`TcpStream`]: struct@crate::net::TcpStream
363     /// [`tokio::spawn`]: fn@crate::spawn
364     ///
365     /// # Example
366     ///
367     /// ```
368     /// use tokio::runtime::Runtime;
369     ///
370     /// fn function_that_spawns(msg: String) {
371     ///     // Had we not used `rt.enter` below, this would panic.
372     ///     tokio::spawn(async move {
373     ///         println!("{}", msg);
374     ///     });
375     /// }
376     ///
377     /// fn main() {
378     ///     let rt = Runtime::new().unwrap();
379     ///
380     ///     let s = "Hello World!".to_string();
381     ///
382     ///     // By entering the context, we tie `tokio::spawn` to this executor.
383     ///     let _guard = rt.enter();
384     ///     function_that_spawns(s);
385     /// }
386     /// ```
enter(&self) -> EnterGuard<'_>387     pub fn enter(&self) -> EnterGuard<'_> {
388         self.handle.enter()
389     }
390 
391     /// Shuts down the runtime, waiting for at most `duration` for all spawned
392     /// work to stop.
393     ///
394     /// See the [struct level documentation](Runtime#shutdown) for more details.
395     ///
396     /// # Examples
397     ///
398     /// ```
399     /// use tokio::runtime::Runtime;
400     /// use tokio::task;
401     ///
402     /// use std::thread;
403     /// use std::time::Duration;
404     ///
405     /// fn main() {
406     ///    let runtime = Runtime::new().unwrap();
407     ///
408     ///    runtime.block_on(async move {
409     ///        task::spawn_blocking(move || {
410     ///            thread::sleep(Duration::from_secs(10_000));
411     ///        });
412     ///    });
413     ///
414     ///    runtime.shutdown_timeout(Duration::from_millis(100));
415     /// }
416     /// ```
shutdown_timeout(mut self, duration: Duration)417     pub fn shutdown_timeout(mut self, duration: Duration) {
418         // Wakeup and shutdown all the worker threads
419         self.handle.inner.shutdown();
420         self.blocking_pool.shutdown(Some(duration));
421     }
422 
423     /// Shuts down the runtime, without waiting for any spawned work to stop.
424     ///
425     /// This can be useful if you want to drop a runtime from within another runtime.
426     /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
427     /// to complete, which would normally not be permitted within an asynchronous context.
428     /// By calling `shutdown_background()`, you can drop the runtime from such a context.
429     ///
430     /// Note however, that because we do not wait for any blocking tasks to complete, this
431     /// may result in a resource leak (in that any blocking tasks are still running until they
432     /// return.
433     ///
434     /// See the [struct level documentation](Runtime#shutdown) for more details.
435     ///
436     /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
437     ///
438     /// ```
439     /// use tokio::runtime::Runtime;
440     ///
441     /// fn main() {
442     ///    let runtime = Runtime::new().unwrap();
443     ///
444     ///    runtime.block_on(async move {
445     ///        let inner_runtime = Runtime::new().unwrap();
446     ///        // ...
447     ///        inner_runtime.shutdown_background();
448     ///    });
449     /// }
450     /// ```
shutdown_background(self)451     pub fn shutdown_background(self) {
452         self.shutdown_timeout(Duration::from_nanos(0))
453     }
454 }
455 
456 #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let
457 impl Drop for Runtime {
drop(&mut self)458     fn drop(&mut self) {
459         match &mut self.scheduler {
460             Scheduler::CurrentThread(current_thread) => {
461                 // This ensures that tasks spawned on the current-thread
462                 // runtime are dropped inside the runtime's context.
463                 let _guard = context::try_set_current(&self.handle.inner);
464                 current_thread.shutdown(&self.handle.inner);
465             }
466             #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))]
467             Scheduler::MultiThread(multi_thread) => {
468                 // The threaded scheduler drops its tasks on its worker threads, which is
469                 // already in the runtime's context.
470                 multi_thread.shutdown(&self.handle.inner);
471             }
472             #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))]
473             Scheduler::MultiThreadAlt(multi_thread) => {
474                 // The threaded scheduler drops its tasks on its worker threads, which is
475                 // already in the runtime's context.
476                 multi_thread.shutdown(&self.handle.inner);
477             }
478         }
479     }
480 }
481 
482 cfg_metrics! {
483     impl Runtime {
484         /// TODO
485         pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
486             self.handle.metrics()
487         }
488     }
489 }
490