1 //! The Tokio runtime. 2 //! 3 //! Unlike other Rust programs, asynchronous applications require runtime 4 //! support. In particular, the following runtime services are necessary: 5 //! 6 //! * An **I/O event loop**, called the driver, which drives I/O resources and 7 //! dispatches I/O events to tasks that depend on them. 8 //! * A **scheduler** to execute [tasks] that use these I/O resources. 9 //! * A **timer** for scheduling work to run after a set period of time. 10 //! 11 //! Tokio's [`Runtime`] bundles all of these services as a single type, allowing 12 //! them to be started, shut down, and configured together. However, often it is 13 //! not required to configure a [`Runtime`] manually, and a user may just use the 14 //! [`tokio::main`] attribute macro, which creates a [`Runtime`] under the hood. 15 //! 16 //! # Usage 17 //! 18 //! When no fine tuning is required, the [`tokio::main`] attribute macro can be 19 //! used. 20 //! 21 //! ```no_run 22 //! use tokio::net::TcpListener; 23 //! use tokio::io::{AsyncReadExt, AsyncWriteExt}; 24 //! 25 //! #[tokio::main] 26 //! async fn main() -> Result<(), Box<dyn std::error::Error>> { 27 //! let listener = TcpListener::bind("127.0.0.1:8080").await?; 28 //! 29 //! loop { 30 //! let (mut socket, _) = listener.accept().await?; 31 //! 32 //! tokio::spawn(async move { 33 //! let mut buf = [0; 1024]; 34 //! 35 //! // In a loop, read data from the socket and write the data back. 36 //! loop { 37 //! let n = match socket.read(&mut buf).await { 38 //! // socket closed 39 //! Ok(n) if n == 0 => return, 40 //! Ok(n) => n, 41 //! Err(e) => { 42 //! println!("failed to read from socket; err = {:?}", e); 43 //! return; 44 //! } 45 //! }; 46 //! 47 //! // Write the data back 48 //! if let Err(e) = socket.write_all(&buf[0..n]).await { 49 //! println!("failed to write to socket; err = {:?}", e); 50 //! return; 51 //! } 52 //! } 53 //! }); 54 //! } 55 //! } 56 //! ``` 57 //! 58 //! From within the context of the runtime, additional tasks are spawned using 59 //! the [`tokio::spawn`] function. Futures spawned using this function will be 60 //! executed on the same thread pool used by the [`Runtime`]. 61 //! 62 //! A [`Runtime`] instance can also be used directly. 63 //! 64 //! ```no_run 65 //! use tokio::net::TcpListener; 66 //! use tokio::io::{AsyncReadExt, AsyncWriteExt}; 67 //! use tokio::runtime::Runtime; 68 //! 69 //! fn main() -> Result<(), Box<dyn std::error::Error>> { 70 //! // Create the runtime 71 //! let rt = Runtime::new()?; 72 //! 73 //! // Spawn the root task 74 //! rt.block_on(async { 75 //! let listener = TcpListener::bind("127.0.0.1:8080").await?; 76 //! 77 //! loop { 78 //! let (mut socket, _) = listener.accept().await?; 79 //! 80 //! tokio::spawn(async move { 81 //! let mut buf = [0; 1024]; 82 //! 83 //! // In a loop, read data from the socket and write the data back. 84 //! loop { 85 //! let n = match socket.read(&mut buf).await { 86 //! // socket closed 87 //! Ok(n) if n == 0 => return, 88 //! Ok(n) => n, 89 //! Err(e) => { 90 //! println!("failed to read from socket; err = {:?}", e); 91 //! return; 92 //! } 93 //! }; 94 //! 95 //! // Write the data back 96 //! if let Err(e) = socket.write_all(&buf[0..n]).await { 97 //! println!("failed to write to socket; err = {:?}", e); 98 //! return; 99 //! } 100 //! } 101 //! }); 102 //! } 103 //! }) 104 //! } 105 //! ``` 106 //! 107 //! ## Runtime Configurations 108 //! 109 //! Tokio provides multiple task scheduling strategies, suitable for different 110 //! applications. The [runtime builder] or `#[tokio::main]` attribute may be 111 //! used to select which scheduler to use. 112 //! 113 //! #### Multi-Thread Scheduler 114 //! 115 //! The multi-thread scheduler executes futures on a _thread pool_, using a 116 //! work-stealing strategy. By default, it will start a worker thread for each 117 //! CPU core available on the system. This tends to be the ideal configuration 118 //! for most applications. The multi-thread scheduler requires the `rt-multi-thread` 119 //! feature flag, and is selected by default: 120 //! ``` 121 //! use tokio::runtime; 122 //! 123 //! # fn main() -> Result<(), Box<dyn std::error::Error>> { 124 //! let threaded_rt = runtime::Runtime::new()?; 125 //! # Ok(()) } 126 //! ``` 127 //! 128 //! Most applications should use the multi-thread scheduler, except in some 129 //! niche use-cases, such as when running only a single thread is required. 130 //! 131 //! #### Current-Thread Scheduler 132 //! 133 //! The current-thread scheduler provides a _single-threaded_ future executor. 134 //! All tasks will be created and executed on the current thread. This requires 135 //! the `rt` feature flag. 136 //! ``` 137 //! use tokio::runtime; 138 //! 139 //! # fn main() -> Result<(), Box<dyn std::error::Error>> { 140 //! let basic_rt = runtime::Builder::new_current_thread() 141 //! .build()?; 142 //! # Ok(()) } 143 //! ``` 144 //! 145 //! #### Resource drivers 146 //! 147 //! When configuring a runtime by hand, no resource drivers are enabled by 148 //! default. In this case, attempting to use networking types or time types will 149 //! fail. In order to enable these types, the resource drivers must be enabled. 150 //! This is done with [`Builder::enable_io`] and [`Builder::enable_time`]. As a 151 //! shorthand, [`Builder::enable_all`] enables both resource drivers. 152 //! 153 //! ## Lifetime of spawned threads 154 //! 155 //! The runtime may spawn threads depending on its configuration and usage. The 156 //! multi-thread scheduler spawns threads to schedule tasks and for `spawn_blocking` 157 //! calls. 158 //! 159 //! While the `Runtime` is active, threads may shutdown after periods of being 160 //! idle. Once `Runtime` is dropped, all runtime threads are forcibly shutdown. 161 //! Any tasks that have not yet completed will be dropped. 162 //! 163 //! [tasks]: crate::task 164 //! [`Runtime`]: Runtime 165 //! [`tokio::spawn`]: crate::spawn 166 //! [`tokio::main`]: ../attr.main.html 167 //! [runtime builder]: crate::runtime::Builder 168 //! [`Runtime::new`]: crate::runtime::Runtime::new 169 //! [`Builder::basic_scheduler`]: crate::runtime::Builder::basic_scheduler 170 //! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler 171 //! [`Builder::enable_io`]: crate::runtime::Builder::enable_io 172 //! [`Builder::enable_time`]: crate::runtime::Builder::enable_time 173 //! [`Builder::enable_all`]: crate::runtime::Builder::enable_all 174 175 // At the top due to macros 176 #[cfg(test)] 177 #[macro_use] 178 mod tests; 179 180 pub(crate) mod enter; 181 182 pub(crate) mod task; 183 184 cfg_stats! { 185 pub mod stats; 186 } 187 cfg_not_stats! { 188 pub(crate) mod stats; 189 } 190 191 cfg_rt! { 192 mod basic_scheduler; 193 use basic_scheduler::BasicScheduler; 194 195 mod blocking; 196 use blocking::BlockingPool; 197 pub(crate) use blocking::spawn_blocking; 198 199 mod builder; 200 pub use self::builder::Builder; 201 202 pub(crate) mod context; 203 pub(crate) mod driver; 204 205 use self::enter::enter; 206 207 mod handle; 208 pub use handle::{EnterGuard, Handle, TryCurrentError}; 209 210 mod spawner; 211 use self::spawner::Spawner; 212 } 213 214 cfg_rt_multi_thread! { 215 mod park; 216 use park::Parker; 217 } 218 219 cfg_rt_multi_thread! { 220 mod queue; 221 222 pub(crate) mod thread_pool; 223 use self::thread_pool::ThreadPool; 224 } 225 226 cfg_rt! { 227 use crate::task::JoinHandle; 228 229 use std::future::Future; 230 use std::time::Duration; 231 232 /// The Tokio runtime. 233 /// 234 /// The runtime provides an I/O driver, task scheduler, [timer], and 235 /// blocking pool, necessary for running asynchronous tasks. 236 /// 237 /// Instances of `Runtime` can be created using [`new`], or [`Builder`]. 238 /// However, most users will use the `#[tokio::main]` annotation on their 239 /// entry point instead. 240 /// 241 /// See [module level][mod] documentation for more details. 242 /// 243 /// # Shutdown 244 /// 245 /// Shutting down the runtime is done by dropping the value. The current 246 /// thread will block until the shut down operation has completed. 247 /// 248 /// * Drain any scheduled work queues. 249 /// * Drop any futures that have not yet completed. 250 /// * Drop the reactor. 251 /// 252 /// Once the reactor has dropped, any outstanding I/O resources bound to 253 /// that reactor will no longer function. Calling any method on them will 254 /// result in an error. 255 /// 256 /// # Sharing 257 /// 258 /// The Tokio runtime implements `Sync` and `Send` to allow you to wrap it 259 /// in a `Arc`. Most fn take `&self` to allow you to call them concurrently 260 /// across multiple threads. 261 /// 262 /// Calls to `shutdown` and `shutdown_timeout` require exclusive ownership of 263 /// the runtime type and this can be achieved via `Arc::try_unwrap` when only 264 /// one strong count reference is left over. 265 /// 266 /// [timer]: crate::time 267 /// [mod]: index.html 268 /// [`new`]: method@Self::new 269 /// [`Builder`]: struct@Builder 270 #[derive(Debug)] 271 pub struct Runtime { 272 /// Task executor 273 kind: Kind, 274 275 /// Handle to runtime, also contains driver handles 276 handle: Handle, 277 278 /// Blocking pool handle, used to signal shutdown 279 blocking_pool: BlockingPool, 280 } 281 282 /// The runtime executor is either a thread-pool or a current-thread executor. 283 #[derive(Debug)] 284 enum Kind { 285 /// Execute all tasks on the current-thread. 286 CurrentThread(BasicScheduler<driver::Driver>), 287 288 /// Execute tasks across multiple threads. 289 #[cfg(feature = "rt-multi-thread")] 290 ThreadPool(ThreadPool), 291 } 292 293 /// After thread starts / before thread stops 294 type Callback = std::sync::Arc<dyn Fn() + Send + Sync>; 295 296 impl Runtime { 297 /// Creates a new runtime instance with default configuration values. 298 /// 299 /// This results in the multi threaded scheduler, I/O driver, and time driver being 300 /// initialized. 301 /// 302 /// Most applications will not need to call this function directly. Instead, 303 /// they will use the [`#[tokio::main]` attribute][main]. When a more complex 304 /// configuration is necessary, the [runtime builder] may be used. 305 /// 306 /// See [module level][mod] documentation for more details. 307 /// 308 /// # Examples 309 /// 310 /// Creating a new `Runtime` with default configuration values. 311 /// 312 /// ``` 313 /// use tokio::runtime::Runtime; 314 /// 315 /// let rt = Runtime::new() 316 /// .unwrap(); 317 /// 318 /// // Use the runtime... 319 /// ``` 320 /// 321 /// [mod]: index.html 322 /// [main]: ../attr.main.html 323 /// [threaded scheduler]: index.html#threaded-scheduler 324 /// [basic scheduler]: index.html#basic-scheduler 325 /// [runtime builder]: crate::runtime::Builder 326 #[cfg(feature = "rt-multi-thread")] 327 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] 328 pub fn new() -> std::io::Result<Runtime> { 329 Builder::new_multi_thread().enable_all().build() 330 } 331 332 /// Returns a handle to the runtime's spawner. 333 /// 334 /// The returned handle can be used to spawn tasks that run on this runtime, and can 335 /// be cloned to allow moving the `Handle` to other threads. 336 /// 337 /// # Examples 338 /// 339 /// ``` 340 /// use tokio::runtime::Runtime; 341 /// 342 /// let rt = Runtime::new() 343 /// .unwrap(); 344 /// 345 /// let handle = rt.handle(); 346 /// 347 /// // Use the handle... 348 /// ``` 349 pub fn handle(&self) -> &Handle { 350 &self.handle 351 } 352 353 /// Spawns a future onto the Tokio runtime. 354 /// 355 /// This spawns the given future onto the runtime's executor, usually a 356 /// thread pool. The thread pool is then responsible for polling the future 357 /// until it completes. 358 /// 359 /// See [module level][mod] documentation for more details. 360 /// 361 /// [mod]: index.html 362 /// 363 /// # Examples 364 /// 365 /// ``` 366 /// use tokio::runtime::Runtime; 367 /// 368 /// # fn dox() { 369 /// // Create the runtime 370 /// let rt = Runtime::new().unwrap(); 371 /// 372 /// // Spawn a future onto the runtime 373 /// rt.spawn(async { 374 /// println!("now running on a worker thread"); 375 /// }); 376 /// # } 377 /// ``` 378 #[cfg_attr(tokio_track_caller, track_caller)] 379 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> 380 where 381 F: Future + Send + 'static, 382 F::Output: Send + 'static, 383 { 384 self.handle.spawn(future) 385 } 386 387 /// Runs the provided function on an executor dedicated to blocking operations. 388 /// 389 /// # Examples 390 /// 391 /// ``` 392 /// use tokio::runtime::Runtime; 393 /// 394 /// # fn dox() { 395 /// // Create the runtime 396 /// let rt = Runtime::new().unwrap(); 397 /// 398 /// // Spawn a blocking function onto the runtime 399 /// rt.spawn_blocking(|| { 400 /// println!("now running on a worker thread"); 401 /// }); 402 /// # } 403 #[cfg_attr(tokio_track_caller, track_caller)] 404 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> 405 where 406 F: FnOnce() -> R + Send + 'static, 407 R: Send + 'static, 408 { 409 self.handle.spawn_blocking(func) 410 } 411 412 /// Runs a future to completion on the Tokio runtime. This is the 413 /// runtime's entry point. 414 /// 415 /// This runs the given future on the current thread, blocking until it is 416 /// complete, and yielding its resolved result. Any tasks or timers 417 /// which the future spawns internally will be executed on the runtime. 418 /// 419 /// # Multi thread scheduler 420 /// 421 /// When the multi thread scheduler is used this will allow futures 422 /// to run within the io driver and timer context of the overall runtime. 423 /// 424 /// # Current thread scheduler 425 /// 426 /// When the current thread scheduler is enabled `block_on` 427 /// can be called concurrently from multiple threads. The first call 428 /// will take ownership of the io and timer drivers. This means 429 /// other threads which do not own the drivers will hook into that one. 430 /// When the first `block_on` completes, other threads will be able to 431 /// "steal" the driver to allow continued execution of their futures. 432 /// 433 /// # Panics 434 /// 435 /// This function panics if the provided future panics, or if called within an 436 /// asynchronous execution context. 437 /// 438 /// # Examples 439 /// 440 /// ```no_run 441 /// use tokio::runtime::Runtime; 442 /// 443 /// // Create the runtime 444 /// let rt = Runtime::new().unwrap(); 445 /// 446 /// // Execute the future, blocking the current thread until completion 447 /// rt.block_on(async { 448 /// println!("hello"); 449 /// }); 450 /// ``` 451 /// 452 /// [handle]: fn@Handle::block_on 453 #[cfg_attr(tokio_track_caller, track_caller)] 454 pub fn block_on<F: Future>(&self, future: F) -> F::Output { 455 #[cfg(all(tokio_unstable, feature = "tracing"))] 456 let future = crate::util::trace::task(future, "block_on", None); 457 458 let _enter = self.enter(); 459 460 match &self.kind { 461 Kind::CurrentThread(exec) => exec.block_on(future), 462 #[cfg(feature = "rt-multi-thread")] 463 Kind::ThreadPool(exec) => exec.block_on(future), 464 } 465 } 466 467 /// Enters the runtime context. 468 /// 469 /// This allows you to construct types that must have an executor 470 /// available on creation such as [`Sleep`] or [`TcpStream`]. It will 471 /// also allow you to call methods such as [`tokio::spawn`]. 472 /// 473 /// [`Sleep`]: struct@crate::time::Sleep 474 /// [`TcpStream`]: struct@crate::net::TcpStream 475 /// [`tokio::spawn`]: fn@crate::spawn 476 /// 477 /// # Example 478 /// 479 /// ``` 480 /// use tokio::runtime::Runtime; 481 /// 482 /// fn function_that_spawns(msg: String) { 483 /// // Had we not used `rt.enter` below, this would panic. 484 /// tokio::spawn(async move { 485 /// println!("{}", msg); 486 /// }); 487 /// } 488 /// 489 /// fn main() { 490 /// let rt = Runtime::new().unwrap(); 491 /// 492 /// let s = "Hello World!".to_string(); 493 /// 494 /// // By entering the context, we tie `tokio::spawn` to this executor. 495 /// let _guard = rt.enter(); 496 /// function_that_spawns(s); 497 /// } 498 /// ``` 499 pub fn enter(&self) -> EnterGuard<'_> { 500 self.handle.enter() 501 } 502 503 /// Shuts down the runtime, waiting for at most `duration` for all spawned 504 /// task to shutdown. 505 /// 506 /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to 507 /// shutdown in a timely fashion. However, dropping a `Runtime` will wait 508 /// indefinitely for all tasks to terminate, and there are cases where a long 509 /// blocking task has been spawned, which can block dropping `Runtime`. 510 /// 511 /// In this case, calling `shutdown_timeout` with an explicit wait timeout 512 /// can work. The `shutdown_timeout` will signal all tasks to shutdown and 513 /// will wait for at most `duration` for all spawned tasks to terminate. If 514 /// `timeout` elapses before all tasks are dropped, the function returns and 515 /// outstanding tasks are potentially leaked. 516 /// 517 /// # Examples 518 /// 519 /// ``` 520 /// use tokio::runtime::Runtime; 521 /// use tokio::task; 522 /// 523 /// use std::thread; 524 /// use std::time::Duration; 525 /// 526 /// fn main() { 527 /// let runtime = Runtime::new().unwrap(); 528 /// 529 /// runtime.block_on(async move { 530 /// task::spawn_blocking(move || { 531 /// thread::sleep(Duration::from_secs(10_000)); 532 /// }); 533 /// }); 534 /// 535 /// runtime.shutdown_timeout(Duration::from_millis(100)); 536 /// } 537 /// ``` 538 pub fn shutdown_timeout(mut self, duration: Duration) { 539 // Wakeup and shutdown all the worker threads 540 self.handle.clone().shutdown(); 541 self.blocking_pool.shutdown(Some(duration)); 542 } 543 544 /// Shuts down the runtime, without waiting for any spawned tasks to shutdown. 545 /// 546 /// This can be useful if you want to drop a runtime from within another runtime. 547 /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks 548 /// to complete, which would normally not be permitted within an asynchronous context. 549 /// By calling `shutdown_background()`, you can drop the runtime from such a context. 550 /// 551 /// Note however, that because we do not wait for any blocking tasks to complete, this 552 /// may result in a resource leak (in that any blocking tasks are still running until they 553 /// return. 554 /// 555 /// This function is equivalent to calling `shutdown_timeout(Duration::of_nanos(0))`. 556 /// 557 /// ``` 558 /// use tokio::runtime::Runtime; 559 /// 560 /// fn main() { 561 /// let runtime = Runtime::new().unwrap(); 562 /// 563 /// runtime.block_on(async move { 564 /// let inner_runtime = Runtime::new().unwrap(); 565 /// // ... 566 /// inner_runtime.shutdown_background(); 567 /// }); 568 /// } 569 /// ``` 570 pub fn shutdown_background(self) { 571 self.shutdown_timeout(Duration::from_nanos(0)) 572 } 573 } 574 575 #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let 576 impl Drop for Runtime { 577 fn drop(&mut self) { 578 match &mut self.kind { 579 Kind::CurrentThread(basic) => { 580 // This ensures that tasks spawned on the basic runtime are dropped inside the 581 // runtime's context. 582 match self::context::try_enter(self.handle.clone()) { 583 Some(guard) => basic.set_context_guard(guard), 584 None => { 585 // The context thread-local has alread been destroyed. 586 // 587 // We don't set the guard in this case. Calls to tokio::spawn in task 588 // destructors would fail regardless if this happens. 589 }, 590 } 591 }, 592 #[cfg(feature = "rt-multi-thread")] 593 Kind::ThreadPool(_) => { 594 // The threaded scheduler drops its tasks on its worker threads, which is 595 // already in the runtime's context. 596 }, 597 } 598 } 599 } 600 } 601