1 use crate::runtime::blocking::BlockingPool; 2 use crate::runtime::scheduler::CurrentThread; 3 use crate::runtime::{context, EnterGuard, Handle}; 4 use crate::task::JoinHandle; 5 6 use std::future::Future; 7 use std::time::Duration; 8 9 cfg_rt_multi_thread! { 10 use crate::runtime::Builder; 11 use crate::runtime::scheduler::MultiThread; 12 13 cfg_unstable! { 14 use crate::runtime::scheduler::MultiThreadAlt; 15 } 16 } 17 18 /// The Tokio runtime. 19 /// 20 /// The runtime provides an I/O driver, task scheduler, [timer], and 21 /// blocking pool, necessary for running asynchronous tasks. 22 /// 23 /// Instances of `Runtime` can be created using [`new`], or [`Builder`]. 24 /// However, most users will use the `#[tokio::main]` annotation on their 25 /// entry point instead. 26 /// 27 /// See [module level][mod] documentation for more details. 28 /// 29 /// # Shutdown 30 /// 31 /// Shutting down the runtime is done by dropping the value, or calling 32 /// [`shutdown_background`] or [`shutdown_timeout`]. 33 /// 34 /// Tasks spawned through [`Runtime::spawn`] keep running until they yield. 35 /// Then they are dropped. They are not *guaranteed* to run to completion, but 36 /// *might* do so if they do not yield until completion. 37 /// 38 /// Blocking functions spawned through [`Runtime::spawn_blocking`] keep running 39 /// until they return. 40 /// 41 /// The thread initiating the shutdown blocks until all spawned work has been 42 /// stopped. This can take an indefinite amount of time. The `Drop` 43 /// implementation waits forever for this. 44 /// 45 /// The [`shutdown_background`] and [`shutdown_timeout`] methods can be used if 46 /// waiting forever is undesired. When the timeout is reached, spawned work that 47 /// did not stop in time and threads running it are leaked. The work continues 48 /// to run until one of the stopping conditions is fulfilled, but the thread 49 /// initiating the shutdown is unblocked. 50 /// 51 /// Once the runtime has been dropped, any outstanding I/O resources bound to 52 /// it will no longer function. Calling any method on them will result in an 53 /// error. 54 /// 55 /// # Sharing 56 /// 57 /// There are several ways to establish shared access to a Tokio runtime: 58 /// 59 /// * Using an <code>[Arc]\<Runtime></code>. 60 /// * Using a [`Handle`]. 61 /// * Entering the runtime context. 62 /// 63 /// Using an <code>[Arc]\<Runtime></code> or [`Handle`] allows you to do various 64 /// things with the runtime such as spawning new tasks or entering the runtime 65 /// context. Both types can be cloned to create a new handle that allows access 66 /// to the same runtime. By passing clones into different tasks or threads, you 67 /// will be able to access the runtime from those tasks or threads. 68 /// 69 /// The difference between <code>[Arc]\<Runtime></code> and [`Handle`] is that 70 /// an <code>[Arc]\<Runtime></code> will prevent the runtime from shutting down, 71 /// whereas a [`Handle`] does not prevent that. This is because shutdown of the 72 /// runtime happens when the destructor of the `Runtime` object runs. 73 /// 74 /// Calls to [`shutdown_background`] and [`shutdown_timeout`] require exclusive 75 /// ownership of the `Runtime` type. When using an <code>[Arc]\<Runtime></code>, 76 /// this can be achieved via [`Arc::try_unwrap`] when only one strong count 77 /// reference is left over. 78 /// 79 /// The runtime context is entered using the [`Runtime::enter`] or 80 /// [`Handle::enter`] methods, which use a thread-local variable to store the 81 /// current runtime. Whenever you are inside the runtime context, methods such 82 /// as [`tokio::spawn`] will use the runtime whose context you are inside. 83 /// 84 /// [timer]: crate::time 85 /// [mod]: index.html 86 /// [`new`]: method@Self::new 87 /// [`Builder`]: struct@Builder 88 /// [`Handle`]: struct@Handle 89 /// [`tokio::spawn`]: crate::spawn 90 /// [`Arc::try_unwrap`]: std::sync::Arc::try_unwrap 91 /// [Arc]: std::sync::Arc 92 /// [`shutdown_background`]: method@Runtime::shutdown_background 93 /// [`shutdown_timeout`]: method@Runtime::shutdown_timeout 94 #[derive(Debug)] 95 pub struct Runtime { 96 /// Task scheduler 97 scheduler: Scheduler, 98 99 /// Handle to runtime, also contains driver handles 100 handle: Handle, 101 102 /// Blocking pool handle, used to signal shutdown 103 blocking_pool: BlockingPool, 104 } 105 106 /// The flavor of a `Runtime`. 107 /// 108 /// This is the return type for [`Handle::runtime_flavor`](crate::runtime::Handle::runtime_flavor()). 109 #[derive(Debug, PartialEq, Eq)] 110 #[non_exhaustive] 111 pub enum RuntimeFlavor { 112 /// The flavor that executes all tasks on the current thread. 113 CurrentThread, 114 /// The flavor that executes tasks across multiple threads. 115 MultiThread, 116 /// The flavor that executes tasks across multiple threads. 117 #[cfg(tokio_unstable)] 118 MultiThreadAlt, 119 } 120 121 /// The runtime scheduler is either a multi-thread or a current-thread executor. 122 #[derive(Debug)] 123 pub(super) enum Scheduler { 124 /// Execute all tasks on the current-thread. 125 CurrentThread(CurrentThread), 126 127 /// Execute tasks across multiple threads. 128 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] 129 MultiThread(MultiThread), 130 131 /// Execute tasks across multiple threads. 132 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] 133 MultiThreadAlt(MultiThreadAlt), 134 } 135 136 impl Runtime { from_parts( scheduler: Scheduler, handle: Handle, blocking_pool: BlockingPool, ) -> Runtime137 pub(super) fn from_parts( 138 scheduler: Scheduler, 139 handle: Handle, 140 blocking_pool: BlockingPool, 141 ) -> Runtime { 142 Runtime { 143 scheduler, 144 handle, 145 blocking_pool, 146 } 147 } 148 149 cfg_not_wasi! { 150 /// Creates a new runtime instance with default configuration values. 151 /// 152 /// This results in the multi threaded scheduler, I/O driver, and time driver being 153 /// initialized. 154 /// 155 /// Most applications will not need to call this function directly. Instead, 156 /// they will use the [`#[tokio::main]` attribute][main]. When a more complex 157 /// configuration is necessary, the [runtime builder] may be used. 158 /// 159 /// See [module level][mod] documentation for more details. 160 /// 161 /// # Examples 162 /// 163 /// Creating a new `Runtime` with default configuration values. 164 /// 165 /// ``` 166 /// use tokio::runtime::Runtime; 167 /// 168 /// let rt = Runtime::new() 169 /// .unwrap(); 170 /// 171 /// // Use the runtime... 172 /// ``` 173 /// 174 /// [mod]: index.html 175 /// [main]: ../attr.main.html 176 /// [threaded scheduler]: index.html#threaded-scheduler 177 /// [runtime builder]: crate::runtime::Builder 178 #[cfg(feature = "rt-multi-thread")] 179 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] 180 pub fn new() -> std::io::Result<Runtime> { 181 Builder::new_multi_thread().enable_all().build() 182 } 183 } 184 185 /// Returns a handle to the runtime's spawner. 186 /// 187 /// The returned handle can be used to spawn tasks that run on this runtime, and can 188 /// be cloned to allow moving the `Handle` to other threads. 189 /// 190 /// Calling [`Handle::block_on`] on a handle to a `current_thread` runtime is error-prone. 191 /// Refer to the documentation of [`Handle::block_on`] for more. 192 /// 193 /// # Examples 194 /// 195 /// ``` 196 /// use tokio::runtime::Runtime; 197 /// 198 /// let rt = Runtime::new() 199 /// .unwrap(); 200 /// 201 /// let handle = rt.handle(); 202 /// 203 /// // Use the handle... 204 /// ``` handle(&self) -> &Handle205 pub fn handle(&self) -> &Handle { 206 &self.handle 207 } 208 209 /// Spawns a future onto the Tokio runtime. 210 /// 211 /// This spawns the given future onto the runtime's executor, usually a 212 /// thread pool. The thread pool is then responsible for polling the future 213 /// until it completes. 214 /// 215 /// The provided future will start running in the background immediately 216 /// when `spawn` is called, even if you don't await the returned 217 /// `JoinHandle`. 218 /// 219 /// See [module level][mod] documentation for more details. 220 /// 221 /// [mod]: index.html 222 /// 223 /// # Examples 224 /// 225 /// ``` 226 /// use tokio::runtime::Runtime; 227 /// 228 /// # fn dox() { 229 /// // Create the runtime 230 /// let rt = Runtime::new().unwrap(); 231 /// 232 /// // Spawn a future onto the runtime 233 /// rt.spawn(async { 234 /// println!("now running on a worker thread"); 235 /// }); 236 /// # } 237 /// ``` 238 #[track_caller] spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,239 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> 240 where 241 F: Future + Send + 'static, 242 F::Output: Send + 'static, 243 { 244 self.handle.spawn(future) 245 } 246 247 /// Runs the provided function on an executor dedicated to blocking operations. 248 /// 249 /// # Examples 250 /// 251 /// ``` 252 /// use tokio::runtime::Runtime; 253 /// 254 /// # fn dox() { 255 /// // Create the runtime 256 /// let rt = Runtime::new().unwrap(); 257 /// 258 /// // Spawn a blocking function onto the runtime 259 /// rt.spawn_blocking(|| { 260 /// println!("now running on a worker thread"); 261 /// }); 262 /// # } 263 #[track_caller] spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,264 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> 265 where 266 F: FnOnce() -> R + Send + 'static, 267 R: Send + 'static, 268 { 269 self.handle.spawn_blocking(func) 270 } 271 272 /// Runs a future to completion on the Tokio runtime. This is the 273 /// runtime's entry point. 274 /// 275 /// This runs the given future on the current thread, blocking until it is 276 /// complete, and yielding its resolved result. Any tasks or timers 277 /// which the future spawns internally will be executed on the runtime. 278 /// 279 /// # Non-worker future 280 /// 281 /// Note that the future required by this function does not run as a 282 /// worker. The expectation is that other tasks are spawned by the future here. 283 /// Awaiting on other futures from the future provided here will not 284 /// perform as fast as those spawned as workers. 285 /// 286 /// # Multi thread scheduler 287 /// 288 /// When the multi thread scheduler is used this will allow futures 289 /// to run within the io driver and timer context of the overall runtime. 290 /// 291 /// Any spawned tasks will continue running after `block_on` returns. 292 /// 293 /// # Current thread scheduler 294 /// 295 /// When the current thread scheduler is enabled `block_on` 296 /// can be called concurrently from multiple threads. The first call 297 /// will take ownership of the io and timer drivers. This means 298 /// other threads which do not own the drivers will hook into that one. 299 /// When the first `block_on` completes, other threads will be able to 300 /// "steal" the driver to allow continued execution of their futures. 301 /// 302 /// Any spawned tasks will be suspended after `block_on` returns. Calling 303 /// `block_on` again will resume previously spawned tasks. 304 /// 305 /// # Panics 306 /// 307 /// This function panics if the provided future panics, or if called within an 308 /// asynchronous execution context. 309 /// 310 /// # Examples 311 /// 312 /// ```no_run 313 /// use tokio::runtime::Runtime; 314 /// 315 /// // Create the runtime 316 /// let rt = Runtime::new().unwrap(); 317 /// 318 /// // Execute the future, blocking the current thread until completion 319 /// rt.block_on(async { 320 /// println!("hello"); 321 /// }); 322 /// ``` 323 /// 324 /// [handle]: fn@Handle::block_on 325 #[track_caller] block_on<F: Future>(&self, future: F) -> F::Output326 pub fn block_on<F: Future>(&self, future: F) -> F::Output { 327 #[cfg(all( 328 tokio_unstable, 329 tokio_taskdump, 330 feature = "rt", 331 target_os = "linux", 332 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") 333 ))] 334 let future = super::task::trace::Trace::root(future); 335 336 #[cfg(all(tokio_unstable, feature = "tracing"))] 337 let future = crate::util::trace::task( 338 future, 339 "block_on", 340 None, 341 crate::runtime::task::Id::next().as_u64(), 342 ); 343 344 let _enter = self.enter(); 345 346 match &self.scheduler { 347 Scheduler::CurrentThread(exec) => exec.block_on(&self.handle.inner, future), 348 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] 349 Scheduler::MultiThread(exec) => exec.block_on(&self.handle.inner, future), 350 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] 351 Scheduler::MultiThreadAlt(exec) => exec.block_on(&self.handle.inner, future), 352 } 353 } 354 355 /// Enters the runtime context. 356 /// 357 /// This allows you to construct types that must have an executor 358 /// available on creation such as [`Sleep`] or [`TcpStream`]. It will 359 /// also allow you to call methods such as [`tokio::spawn`]. 360 /// 361 /// [`Sleep`]: struct@crate::time::Sleep 362 /// [`TcpStream`]: struct@crate::net::TcpStream 363 /// [`tokio::spawn`]: fn@crate::spawn 364 /// 365 /// # Example 366 /// 367 /// ``` 368 /// use tokio::runtime::Runtime; 369 /// 370 /// fn function_that_spawns(msg: String) { 371 /// // Had we not used `rt.enter` below, this would panic. 372 /// tokio::spawn(async move { 373 /// println!("{}", msg); 374 /// }); 375 /// } 376 /// 377 /// fn main() { 378 /// let rt = Runtime::new().unwrap(); 379 /// 380 /// let s = "Hello World!".to_string(); 381 /// 382 /// // By entering the context, we tie `tokio::spawn` to this executor. 383 /// let _guard = rt.enter(); 384 /// function_that_spawns(s); 385 /// } 386 /// ``` enter(&self) -> EnterGuard<'_>387 pub fn enter(&self) -> EnterGuard<'_> { 388 self.handle.enter() 389 } 390 391 /// Shuts down the runtime, waiting for at most `duration` for all spawned 392 /// work to stop. 393 /// 394 /// See the [struct level documentation](Runtime#shutdown) for more details. 395 /// 396 /// # Examples 397 /// 398 /// ``` 399 /// use tokio::runtime::Runtime; 400 /// use tokio::task; 401 /// 402 /// use std::thread; 403 /// use std::time::Duration; 404 /// 405 /// fn main() { 406 /// let runtime = Runtime::new().unwrap(); 407 /// 408 /// runtime.block_on(async move { 409 /// task::spawn_blocking(move || { 410 /// thread::sleep(Duration::from_secs(10_000)); 411 /// }); 412 /// }); 413 /// 414 /// runtime.shutdown_timeout(Duration::from_millis(100)); 415 /// } 416 /// ``` shutdown_timeout(mut self, duration: Duration)417 pub fn shutdown_timeout(mut self, duration: Duration) { 418 // Wakeup and shutdown all the worker threads 419 self.handle.inner.shutdown(); 420 self.blocking_pool.shutdown(Some(duration)); 421 } 422 423 /// Shuts down the runtime, without waiting for any spawned work to stop. 424 /// 425 /// This can be useful if you want to drop a runtime from within another runtime. 426 /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks 427 /// to complete, which would normally not be permitted within an asynchronous context. 428 /// By calling `shutdown_background()`, you can drop the runtime from such a context. 429 /// 430 /// Note however, that because we do not wait for any blocking tasks to complete, this 431 /// may result in a resource leak (in that any blocking tasks are still running until they 432 /// return. 433 /// 434 /// See the [struct level documentation](Runtime#shutdown) for more details. 435 /// 436 /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`. 437 /// 438 /// ``` 439 /// use tokio::runtime::Runtime; 440 /// 441 /// fn main() { 442 /// let runtime = Runtime::new().unwrap(); 443 /// 444 /// runtime.block_on(async move { 445 /// let inner_runtime = Runtime::new().unwrap(); 446 /// // ... 447 /// inner_runtime.shutdown_background(); 448 /// }); 449 /// } 450 /// ``` shutdown_background(self)451 pub fn shutdown_background(self) { 452 self.shutdown_timeout(Duration::from_nanos(0)) 453 } 454 } 455 456 #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let 457 impl Drop for Runtime { drop(&mut self)458 fn drop(&mut self) { 459 match &mut self.scheduler { 460 Scheduler::CurrentThread(current_thread) => { 461 // This ensures that tasks spawned on the current-thread 462 // runtime are dropped inside the runtime's context. 463 let _guard = context::try_set_current(&self.handle.inner); 464 current_thread.shutdown(&self.handle.inner); 465 } 466 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] 467 Scheduler::MultiThread(multi_thread) => { 468 // The threaded scheduler drops its tasks on its worker threads, which is 469 // already in the runtime's context. 470 multi_thread.shutdown(&self.handle.inner); 471 } 472 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] 473 Scheduler::MultiThreadAlt(multi_thread) => { 474 // The threaded scheduler drops its tasks on its worker threads, which is 475 // already in the runtime's context. 476 multi_thread.shutdown(&self.handle.inner); 477 } 478 } 479 } 480 } 481 482 cfg_metrics! { 483 impl Runtime { 484 /// TODO 485 pub fn metrics(&self) -> crate::runtime::RuntimeMetrics { 486 self.handle.metrics() 487 } 488 } 489 } 490