1 use crate::runtime::handle::Handle; 2 use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime}; 3 use crate::util::rand::{RngSeed, RngSeedGenerator}; 4 5 use std::fmt; 6 use std::io; 7 use std::time::Duration; 8 9 /// Builds Tokio Runtime with custom configuration values. 10 /// 11 /// Methods can be chained in order to set the configuration values. The 12 /// Runtime is constructed by calling [`build`]. 13 /// 14 /// New instances of `Builder` are obtained via [`Builder::new_multi_thread`] 15 /// or [`Builder::new_current_thread`]. 16 /// 17 /// See function level documentation for details on the various configuration 18 /// settings. 19 /// 20 /// [`build`]: method@Self::build 21 /// [`Builder::new_multi_thread`]: method@Self::new_multi_thread 22 /// [`Builder::new_current_thread`]: method@Self::new_current_thread 23 /// 24 /// # Examples 25 /// 26 /// ``` 27 /// use tokio::runtime::Builder; 28 /// 29 /// fn main() { 30 /// // build runtime 31 /// let runtime = Builder::new_multi_thread() 32 /// .worker_threads(4) 33 /// .thread_name("my-custom-name") 34 /// .thread_stack_size(3 * 1024 * 1024) 35 /// .build() 36 /// .unwrap(); 37 /// 38 /// // use runtime ... 39 /// } 40 /// ``` 41 pub struct Builder { 42 /// Runtime type 43 kind: Kind, 44 45 /// Whether or not to enable the I/O driver 46 enable_io: bool, 47 nevents: usize, 48 49 /// Whether or not to enable the time driver 50 enable_time: bool, 51 52 /// Whether or not the clock should start paused. 53 start_paused: bool, 54 55 /// The number of worker threads, used by Runtime. 56 /// 57 /// Only used when not using the current-thread executor. 58 worker_threads: Option<usize>, 59 60 /// Cap on thread usage. 61 max_blocking_threads: usize, 62 63 /// Name fn used for threads spawned by the runtime. 64 pub(super) thread_name: ThreadNameFn, 65 66 /// Stack size used for threads spawned by the runtime. 67 pub(super) thread_stack_size: Option<usize>, 68 69 /// Callback to run after each thread starts. 70 pub(super) after_start: Option<Callback>, 71 72 /// To run before each worker thread stops 73 pub(super) before_stop: Option<Callback>, 74 75 /// To run before each worker thread is parked. 76 pub(super) before_park: Option<Callback>, 77 78 /// To run after each thread is unparked. 79 pub(super) after_unpark: Option<Callback>, 80 81 /// Customizable keep alive timeout for BlockingPool 82 pub(super) keep_alive: Option<Duration>, 83 84 /// How many ticks before pulling a task from the global/remote queue? 85 /// 86 /// When `None`, the value is unspecified and behavior details are left to 87 /// the scheduler. Each scheduler flavor could choose to either pick its own 88 /// default value or use some other strategy to decide when to poll from the 89 /// global queue. For example, the multi-threaded scheduler uses a 90 /// self-tuning strategy based on mean task poll times. 91 pub(super) global_queue_interval: Option<u32>, 92 93 /// How many ticks before yielding to the driver for timer and I/O events? 94 pub(super) event_interval: u32, 95 96 pub(super) local_queue_capacity: usize, 97 98 /// When true, the multi-threade scheduler LIFO slot should not be used. 99 /// 100 /// This option should only be exposed as unstable. 101 pub(super) disable_lifo_slot: bool, 102 103 /// Specify a random number generator seed to provide deterministic results 104 pub(super) seed_generator: RngSeedGenerator, 105 106 /// When true, enables task poll count histogram instrumentation. 107 pub(super) metrics_poll_count_histogram_enable: bool, 108 109 /// Configures the task poll count histogram 110 pub(super) metrics_poll_count_histogram: HistogramBuilder, 111 112 #[cfg(tokio_unstable)] 113 pub(super) unhandled_panic: UnhandledPanic, 114 } 115 116 cfg_unstable! { 117 /// How the runtime should respond to unhandled panics. 118 /// 119 /// Instances of `UnhandledPanic` are passed to `Builder::unhandled_panic` 120 /// to configure the runtime behavior when a spawned task panics. 121 /// 122 /// See [`Builder::unhandled_panic`] for more details. 123 #[derive(Debug, Clone)] 124 #[non_exhaustive] 125 pub enum UnhandledPanic { 126 /// The runtime should ignore panics on spawned tasks. 127 /// 128 /// The panic is forwarded to the task's [`JoinHandle`] and all spawned 129 /// tasks continue running normally. 130 /// 131 /// This is the default behavior. 132 /// 133 /// # Examples 134 /// 135 /// ``` 136 /// use tokio::runtime::{self, UnhandledPanic}; 137 /// 138 /// # pub fn main() { 139 /// let rt = runtime::Builder::new_current_thread() 140 /// .unhandled_panic(UnhandledPanic::Ignore) 141 /// .build() 142 /// .unwrap(); 143 /// 144 /// let task1 = rt.spawn(async { panic!("boom"); }); 145 /// let task2 = rt.spawn(async { 146 /// // This task completes normally 147 /// "done" 148 /// }); 149 /// 150 /// rt.block_on(async { 151 /// // The panic on the first task is forwarded to the `JoinHandle` 152 /// assert!(task1.await.is_err()); 153 /// 154 /// // The second task completes normally 155 /// assert!(task2.await.is_ok()); 156 /// }) 157 /// # } 158 /// ``` 159 /// 160 /// [`JoinHandle`]: struct@crate::task::JoinHandle 161 Ignore, 162 163 /// The runtime should immediately shutdown if a spawned task panics. 164 /// 165 /// The runtime will immediately shutdown even if the panicked task's 166 /// [`JoinHandle`] is still available. All further spawned tasks will be 167 /// immediately dropped and call to [`Runtime::block_on`] will panic. 168 /// 169 /// # Examples 170 /// 171 /// ```should_panic 172 /// use tokio::runtime::{self, UnhandledPanic}; 173 /// 174 /// # pub fn main() { 175 /// let rt = runtime::Builder::new_current_thread() 176 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) 177 /// .build() 178 /// .unwrap(); 179 /// 180 /// rt.spawn(async { panic!("boom"); }); 181 /// rt.spawn(async { 182 /// // This task never completes. 183 /// }); 184 /// 185 /// rt.block_on(async { 186 /// // Do some work 187 /// # loop { tokio::task::yield_now().await; } 188 /// }) 189 /// # } 190 /// ``` 191 /// 192 /// [`JoinHandle`]: struct@crate::task::JoinHandle 193 ShutdownRuntime, 194 } 195 } 196 197 pub(crate) type ThreadNameFn = std::sync::Arc<dyn Fn() -> String + Send + Sync + 'static>; 198 199 #[derive(Clone, Copy)] 200 pub(crate) enum Kind { 201 CurrentThread, 202 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] 203 MultiThread, 204 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] 205 MultiThreadAlt, 206 } 207 208 impl Builder { 209 /// Returns a new builder with the current thread scheduler selected. 210 /// 211 /// Configuration methods can be chained on the return value. 212 /// 213 /// To spawn non-`Send` tasks on the resulting runtime, combine it with a 214 /// [`LocalSet`]. 215 /// 216 /// [`LocalSet`]: crate::task::LocalSet new_current_thread() -> Builder217 pub fn new_current_thread() -> Builder { 218 #[cfg(loom)] 219 const EVENT_INTERVAL: u32 = 4; 220 // The number `61` is fairly arbitrary. I believe this value was copied from golang. 221 #[cfg(not(loom))] 222 const EVENT_INTERVAL: u32 = 61; 223 224 Builder::new(Kind::CurrentThread, EVENT_INTERVAL) 225 } 226 227 cfg_not_wasi! { 228 /// Returns a new builder with the multi thread scheduler selected. 229 /// 230 /// Configuration methods can be chained on the return value. 231 #[cfg(feature = "rt-multi-thread")] 232 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] 233 pub fn new_multi_thread() -> Builder { 234 // The number `61` is fairly arbitrary. I believe this value was copied from golang. 235 Builder::new(Kind::MultiThread, 61) 236 } 237 238 cfg_unstable! { 239 /// Returns a new builder with the alternate multi thread scheduler 240 /// selected. 241 /// 242 /// The alternate multi threaded scheduler is an in-progress 243 /// candidate to replace the existing multi threaded scheduler. It 244 /// currently does not scale as well to 16+ processors. 245 /// 246 /// This runtime flavor is currently **not considered production 247 /// ready**. 248 /// 249 /// Configuration methods can be chained on the return value. 250 #[cfg(feature = "rt-multi-thread")] 251 #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))] 252 pub fn new_multi_thread_alt() -> Builder { 253 // The number `61` is fairly arbitrary. I believe this value was copied from golang. 254 Builder::new(Kind::MultiThreadAlt, 61) 255 } 256 } 257 } 258 259 /// Returns a new runtime builder initialized with default configuration 260 /// values. 261 /// 262 /// Configuration methods can be chained on the return value. new(kind: Kind, event_interval: u32) -> Builder263 pub(crate) fn new(kind: Kind, event_interval: u32) -> Builder { 264 Builder { 265 kind, 266 267 // I/O defaults to "off" 268 enable_io: false, 269 nevents: 1024, 270 271 // Time defaults to "off" 272 enable_time: false, 273 274 // The clock starts not-paused 275 start_paused: false, 276 277 // Read from environment variable first in multi-threaded mode. 278 // Default to lazy auto-detection (one thread per CPU core) 279 worker_threads: None, 280 281 max_blocking_threads: 512, 282 283 // Default thread name 284 thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()), 285 286 // Do not set a stack size by default 287 thread_stack_size: None, 288 289 // No worker thread callbacks 290 after_start: None, 291 before_stop: None, 292 before_park: None, 293 after_unpark: None, 294 295 keep_alive: None, 296 297 // Defaults for these values depend on the scheduler kind, so we get them 298 // as parameters. 299 global_queue_interval: None, 300 event_interval, 301 302 #[cfg(not(loom))] 303 local_queue_capacity: 256, 304 305 #[cfg(loom)] 306 local_queue_capacity: 4, 307 308 seed_generator: RngSeedGenerator::new(RngSeed::new()), 309 310 #[cfg(tokio_unstable)] 311 unhandled_panic: UnhandledPanic::Ignore, 312 313 metrics_poll_count_histogram_enable: false, 314 315 metrics_poll_count_histogram: Default::default(), 316 317 disable_lifo_slot: false, 318 } 319 } 320 321 /// Enables both I/O and time drivers. 322 /// 323 /// Doing this is a shorthand for calling `enable_io` and `enable_time` 324 /// individually. If additional components are added to Tokio in the future, 325 /// `enable_all` will include these future components. 326 /// 327 /// # Examples 328 /// 329 /// ``` 330 /// use tokio::runtime; 331 /// 332 /// let rt = runtime::Builder::new_multi_thread() 333 /// .enable_all() 334 /// .build() 335 /// .unwrap(); 336 /// ``` enable_all(&mut self) -> &mut Self337 pub fn enable_all(&mut self) -> &mut Self { 338 #[cfg(any( 339 feature = "net", 340 all(unix, feature = "process"), 341 all(unix, feature = "signal") 342 ))] 343 self.enable_io(); 344 #[cfg(feature = "time")] 345 self.enable_time(); 346 347 self 348 } 349 350 /// Sets the number of worker threads the `Runtime` will use. 351 /// 352 /// This can be any number above 0 though it is advised to keep this value 353 /// on the smaller side. 354 /// 355 /// This will override the value read from environment variable `TOKIO_WORKER_THREADS`. 356 /// 357 /// # Default 358 /// 359 /// The default value is the number of cores available to the system. 360 /// 361 /// When using the `current_thread` runtime this method has no effect. 362 /// 363 /// # Examples 364 /// 365 /// ## Multi threaded runtime with 4 threads 366 /// 367 /// ``` 368 /// use tokio::runtime; 369 /// 370 /// // This will spawn a work-stealing runtime with 4 worker threads. 371 /// let rt = runtime::Builder::new_multi_thread() 372 /// .worker_threads(4) 373 /// .build() 374 /// .unwrap(); 375 /// 376 /// rt.spawn(async move {}); 377 /// ``` 378 /// 379 /// ## Current thread runtime (will only run on the current thread via `Runtime::block_on`) 380 /// 381 /// ``` 382 /// use tokio::runtime; 383 /// 384 /// // Create a runtime that _must_ be driven from a call 385 /// // to `Runtime::block_on`. 386 /// let rt = runtime::Builder::new_current_thread() 387 /// .build() 388 /// .unwrap(); 389 /// 390 /// // This will run the runtime and future on the current thread 391 /// rt.block_on(async move {}); 392 /// ``` 393 /// 394 /// # Panics 395 /// 396 /// This will panic if `val` is not larger than `0`. 397 #[track_caller] worker_threads(&mut self, val: usize) -> &mut Self398 pub fn worker_threads(&mut self, val: usize) -> &mut Self { 399 assert!(val > 0, "Worker threads cannot be set to 0"); 400 self.worker_threads = Some(val); 401 self 402 } 403 404 /// Specifies the limit for additional threads spawned by the Runtime. 405 /// 406 /// These threads are used for blocking operations like tasks spawned 407 /// through [`spawn_blocking`], this includes but is not limited to: 408 /// - [`fs`] operations 409 /// - dns resolution through [`ToSocketAddrs`] 410 /// - writing to [`Stdout`] or [`Stderr`] 411 /// - reading from [`Stdin`] 412 /// 413 /// Unlike the [`worker_threads`], they are not always active and will exit 414 /// if left idle for too long. You can change this timeout duration with [`thread_keep_alive`]. 415 /// 416 /// It's recommended to not set this limit too low in order to avoid hanging on operations 417 /// requiring [`spawn_blocking`]. 418 /// 419 /// The default value is 512. 420 /// 421 /// # Panics 422 /// 423 /// This will panic if `val` is not larger than `0`. 424 /// 425 /// # Upgrading from 0.x 426 /// 427 /// In old versions `max_threads` limited both blocking and worker threads, but the 428 /// current `max_blocking_threads` does not include async worker threads in the count. 429 /// 430 /// [`spawn_blocking`]: fn@crate::task::spawn_blocking 431 /// [`fs`]: mod@crate::fs 432 /// [`ToSocketAddrs`]: trait@crate::net::ToSocketAddrs 433 /// [`Stdout`]: struct@crate::io::Stdout 434 /// [`Stdin`]: struct@crate::io::Stdin 435 /// [`Stderr`]: struct@crate::io::Stderr 436 /// [`worker_threads`]: Self::worker_threads 437 /// [`thread_keep_alive`]: Self::thread_keep_alive 438 #[track_caller] 439 #[cfg_attr(docsrs, doc(alias = "max_threads"))] max_blocking_threads(&mut self, val: usize) -> &mut Self440 pub fn max_blocking_threads(&mut self, val: usize) -> &mut Self { 441 assert!(val > 0, "Max blocking threads cannot be set to 0"); 442 self.max_blocking_threads = val; 443 self 444 } 445 446 /// Sets name of threads spawned by the `Runtime`'s thread pool. 447 /// 448 /// The default name is "tokio-runtime-worker". 449 /// 450 /// # Examples 451 /// 452 /// ``` 453 /// # use tokio::runtime; 454 /// 455 /// # pub fn main() { 456 /// let rt = runtime::Builder::new_multi_thread() 457 /// .thread_name("my-pool") 458 /// .build(); 459 /// # } 460 /// ``` thread_name(&mut self, val: impl Into<String>) -> &mut Self461 pub fn thread_name(&mut self, val: impl Into<String>) -> &mut Self { 462 let val = val.into(); 463 self.thread_name = std::sync::Arc::new(move || val.clone()); 464 self 465 } 466 467 /// Sets a function used to generate the name of threads spawned by the `Runtime`'s thread pool. 468 /// 469 /// The default name fn is `|| "tokio-runtime-worker".into()`. 470 /// 471 /// # Examples 472 /// 473 /// ``` 474 /// # use tokio::runtime; 475 /// # use std::sync::atomic::{AtomicUsize, Ordering}; 476 /// # pub fn main() { 477 /// let rt = runtime::Builder::new_multi_thread() 478 /// .thread_name_fn(|| { 479 /// static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); 480 /// let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); 481 /// format!("my-pool-{}", id) 482 /// }) 483 /// .build(); 484 /// # } 485 /// ``` thread_name_fn<F>(&mut self, f: F) -> &mut Self where F: Fn() -> String + Send + Sync + 'static,486 pub fn thread_name_fn<F>(&mut self, f: F) -> &mut Self 487 where 488 F: Fn() -> String + Send + Sync + 'static, 489 { 490 self.thread_name = std::sync::Arc::new(f); 491 self 492 } 493 494 /// Sets the stack size (in bytes) for worker threads. 495 /// 496 /// The actual stack size may be greater than this value if the platform 497 /// specifies minimal stack size. 498 /// 499 /// The default stack size for spawned threads is 2 MiB, though this 500 /// particular stack size is subject to change in the future. 501 /// 502 /// # Examples 503 /// 504 /// ``` 505 /// # use tokio::runtime; 506 /// 507 /// # pub fn main() { 508 /// let rt = runtime::Builder::new_multi_thread() 509 /// .thread_stack_size(32 * 1024) 510 /// .build(); 511 /// # } 512 /// ``` thread_stack_size(&mut self, val: usize) -> &mut Self513 pub fn thread_stack_size(&mut self, val: usize) -> &mut Self { 514 self.thread_stack_size = Some(val); 515 self 516 } 517 518 /// Executes function `f` after each thread is started but before it starts 519 /// doing work. 520 /// 521 /// This is intended for bookkeeping and monitoring use cases. 522 /// 523 /// # Examples 524 /// 525 /// ``` 526 /// # use tokio::runtime; 527 /// # pub fn main() { 528 /// let runtime = runtime::Builder::new_multi_thread() 529 /// .on_thread_start(|| { 530 /// println!("thread started"); 531 /// }) 532 /// .build(); 533 /// # } 534 /// ``` 535 #[cfg(not(loom))] on_thread_start<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,536 pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self 537 where 538 F: Fn() + Send + Sync + 'static, 539 { 540 self.after_start = Some(std::sync::Arc::new(f)); 541 self 542 } 543 544 /// Executes function `f` before each thread stops. 545 /// 546 /// This is intended for bookkeeping and monitoring use cases. 547 /// 548 /// # Examples 549 /// 550 /// ``` 551 /// # use tokio::runtime; 552 /// # pub fn main() { 553 /// let runtime = runtime::Builder::new_multi_thread() 554 /// .on_thread_stop(|| { 555 /// println!("thread stopping"); 556 /// }) 557 /// .build(); 558 /// # } 559 /// ``` 560 #[cfg(not(loom))] on_thread_stop<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,561 pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self 562 where 563 F: Fn() + Send + Sync + 'static, 564 { 565 self.before_stop = Some(std::sync::Arc::new(f)); 566 self 567 } 568 569 /// Executes function `f` just before a thread is parked (goes idle). 570 /// `f` is called within the Tokio context, so functions like [`tokio::spawn`](crate::spawn) 571 /// can be called, and may result in this thread being unparked immediately. 572 /// 573 /// This can be used to start work only when the executor is idle, or for bookkeeping 574 /// and monitoring purposes. 575 /// 576 /// Note: There can only be one park callback for a runtime; calling this function 577 /// more than once replaces the last callback defined, rather than adding to it. 578 /// 579 /// # Examples 580 /// 581 /// ## Multithreaded executor 582 /// ``` 583 /// # use std::sync::Arc; 584 /// # use std::sync::atomic::{AtomicBool, Ordering}; 585 /// # use tokio::runtime; 586 /// # use tokio::sync::Barrier; 587 /// # pub fn main() { 588 /// let once = AtomicBool::new(true); 589 /// let barrier = Arc::new(Barrier::new(2)); 590 /// 591 /// let runtime = runtime::Builder::new_multi_thread() 592 /// .worker_threads(1) 593 /// .on_thread_park({ 594 /// let barrier = barrier.clone(); 595 /// move || { 596 /// let barrier = barrier.clone(); 597 /// if once.swap(false, Ordering::Relaxed) { 598 /// tokio::spawn(async move { barrier.wait().await; }); 599 /// } 600 /// } 601 /// }) 602 /// .build() 603 /// .unwrap(); 604 /// 605 /// runtime.block_on(async { 606 /// barrier.wait().await; 607 /// }) 608 /// # } 609 /// ``` 610 /// ## Current thread executor 611 /// ``` 612 /// # use std::sync::Arc; 613 /// # use std::sync::atomic::{AtomicBool, Ordering}; 614 /// # use tokio::runtime; 615 /// # use tokio::sync::Barrier; 616 /// # pub fn main() { 617 /// let once = AtomicBool::new(true); 618 /// let barrier = Arc::new(Barrier::new(2)); 619 /// 620 /// let runtime = runtime::Builder::new_current_thread() 621 /// .on_thread_park({ 622 /// let barrier = barrier.clone(); 623 /// move || { 624 /// let barrier = barrier.clone(); 625 /// if once.swap(false, Ordering::Relaxed) { 626 /// tokio::spawn(async move { barrier.wait().await; }); 627 /// } 628 /// } 629 /// }) 630 /// .build() 631 /// .unwrap(); 632 /// 633 /// runtime.block_on(async { 634 /// barrier.wait().await; 635 /// }) 636 /// # } 637 /// ``` 638 #[cfg(not(loom))] on_thread_park<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,639 pub fn on_thread_park<F>(&mut self, f: F) -> &mut Self 640 where 641 F: Fn() + Send + Sync + 'static, 642 { 643 self.before_park = Some(std::sync::Arc::new(f)); 644 self 645 } 646 647 /// Executes function `f` just after a thread unparks (starts executing tasks). 648 /// 649 /// This is intended for bookkeeping and monitoring use cases; note that work 650 /// in this callback will increase latencies when the application has allowed one or 651 /// more runtime threads to go idle. 652 /// 653 /// Note: There can only be one unpark callback for a runtime; calling this function 654 /// more than once replaces the last callback defined, rather than adding to it. 655 /// 656 /// # Examples 657 /// 658 /// ``` 659 /// # use tokio::runtime; 660 /// # pub fn main() { 661 /// let runtime = runtime::Builder::new_multi_thread() 662 /// .on_thread_unpark(|| { 663 /// println!("thread unparking"); 664 /// }) 665 /// .build(); 666 /// 667 /// runtime.unwrap().block_on(async { 668 /// tokio::task::yield_now().await; 669 /// println!("Hello from Tokio!"); 670 /// }) 671 /// # } 672 /// ``` 673 #[cfg(not(loom))] on_thread_unpark<F>(&mut self, f: F) -> &mut Self where F: Fn() + Send + Sync + 'static,674 pub fn on_thread_unpark<F>(&mut self, f: F) -> &mut Self 675 where 676 F: Fn() + Send + Sync + 'static, 677 { 678 self.after_unpark = Some(std::sync::Arc::new(f)); 679 self 680 } 681 682 /// Creates the configured `Runtime`. 683 /// 684 /// The returned `Runtime` instance is ready to spawn tasks. 685 /// 686 /// # Examples 687 /// 688 /// ``` 689 /// use tokio::runtime::Builder; 690 /// 691 /// let rt = Builder::new_multi_thread().build().unwrap(); 692 /// 693 /// rt.block_on(async { 694 /// println!("Hello from the Tokio runtime"); 695 /// }); 696 /// ``` build(&mut self) -> io::Result<Runtime>697 pub fn build(&mut self) -> io::Result<Runtime> { 698 match &self.kind { 699 Kind::CurrentThread => self.build_current_thread_runtime(), 700 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] 701 Kind::MultiThread => self.build_threaded_runtime(), 702 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] 703 Kind::MultiThreadAlt => self.build_alt_threaded_runtime(), 704 } 705 } 706 get_cfg(&self) -> driver::Cfg707 fn get_cfg(&self) -> driver::Cfg { 708 driver::Cfg { 709 enable_pause_time: match self.kind { 710 Kind::CurrentThread => true, 711 #[cfg(all(feature = "rt-multi-thread", not(target_os = "wasi")))] 712 Kind::MultiThread => false, 713 #[cfg(all(tokio_unstable, feature = "rt-multi-thread", not(target_os = "wasi")))] 714 Kind::MultiThreadAlt => false, 715 }, 716 enable_io: self.enable_io, 717 enable_time: self.enable_time, 718 start_paused: self.start_paused, 719 nevents: self.nevents, 720 } 721 } 722 723 /// Sets a custom timeout for a thread in the blocking pool. 724 /// 725 /// By default, the timeout for a thread is set to 10 seconds. This can 726 /// be overridden using .thread_keep_alive(). 727 /// 728 /// # Example 729 /// 730 /// ``` 731 /// # use tokio::runtime; 732 /// # use std::time::Duration; 733 /// # pub fn main() { 734 /// let rt = runtime::Builder::new_multi_thread() 735 /// .thread_keep_alive(Duration::from_millis(100)) 736 /// .build(); 737 /// # } 738 /// ``` thread_keep_alive(&mut self, duration: Duration) -> &mut Self739 pub fn thread_keep_alive(&mut self, duration: Duration) -> &mut Self { 740 self.keep_alive = Some(duration); 741 self 742 } 743 744 /// Sets the number of scheduler ticks after which the scheduler will poll the global 745 /// task queue. 746 /// 747 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. 748 /// 749 /// By default the global queue interval is: 750 /// 751 /// * `31` for the current-thread scheduler. 752 /// * `61` for the multithreaded scheduler. 753 /// 754 /// Schedulers have a local queue of already-claimed tasks, and a global queue of incoming 755 /// tasks. Setting the interval to a smaller value increases the fairness of the scheduler, 756 /// at the cost of more synchronization overhead. That can be beneficial for prioritizing 757 /// getting started on new work, especially if tasks frequently yield rather than complete 758 /// or await on further I/O. Conversely, a higher value prioritizes existing work, and 759 /// is a good choice when most tasks quickly complete polling. 760 /// 761 /// # Examples 762 /// 763 /// ``` 764 /// # use tokio::runtime; 765 /// # pub fn main() { 766 /// let rt = runtime::Builder::new_multi_thread() 767 /// .global_queue_interval(31) 768 /// .build(); 769 /// # } 770 /// ``` global_queue_interval(&mut self, val: u32) -> &mut Self771 pub fn global_queue_interval(&mut self, val: u32) -> &mut Self { 772 self.global_queue_interval = Some(val); 773 self 774 } 775 776 /// Sets the number of scheduler ticks after which the scheduler will poll for 777 /// external events (timers, I/O, and so on). 778 /// 779 /// A scheduler "tick" roughly corresponds to one `poll` invocation on a task. 780 /// 781 /// By default, the event interval is `61` for all scheduler types. 782 /// 783 /// Setting the event interval determines the effective "priority" of delivering 784 /// these external events (which may wake up additional tasks), compared to 785 /// executing tasks that are currently ready to run. A smaller value is useful 786 /// when tasks frequently spend a long time in polling, or frequently yield, 787 /// which can result in overly long delays picking up I/O events. Conversely, 788 /// picking up new events requires extra synchronization and syscall overhead, 789 /// so if tasks generally complete their polling quickly, a higher event interval 790 /// will minimize that overhead while still keeping the scheduler responsive to 791 /// events. 792 /// 793 /// # Examples 794 /// 795 /// ``` 796 /// # use tokio::runtime; 797 /// # pub fn main() { 798 /// let rt = runtime::Builder::new_multi_thread() 799 /// .event_interval(31) 800 /// .build(); 801 /// # } 802 /// ``` event_interval(&mut self, val: u32) -> &mut Self803 pub fn event_interval(&mut self, val: u32) -> &mut Self { 804 self.event_interval = val; 805 self 806 } 807 808 cfg_unstable! { 809 /// Configure how the runtime responds to an unhandled panic on a 810 /// spawned task. 811 /// 812 /// By default, an unhandled panic (i.e. a panic not caught by 813 /// [`std::panic::catch_unwind`]) has no impact on the runtime's 814 /// execution. The panic is error value is forwarded to the task's 815 /// [`JoinHandle`] and all other spawned tasks continue running. 816 /// 817 /// The `unhandled_panic` option enables configuring this behavior. 818 /// 819 /// * `UnhandledPanic::Ignore` is the default behavior. Panics on 820 /// spawned tasks have no impact on the runtime's execution. 821 /// * `UnhandledPanic::ShutdownRuntime` will force the runtime to 822 /// shutdown immediately when a spawned task panics even if that 823 /// task's `JoinHandle` has not been dropped. All other spawned tasks 824 /// will immediately terminate and further calls to 825 /// [`Runtime::block_on`] will panic. 826 /// 827 /// # Unstable 828 /// 829 /// This option is currently unstable and its implementation is 830 /// incomplete. The API may change or be removed in the future. See 831 /// tokio-rs/tokio#4516 for more details. 832 /// 833 /// # Examples 834 /// 835 /// The following demonstrates a runtime configured to shutdown on 836 /// panic. The first spawned task panics and results in the runtime 837 /// shutting down. The second spawned task never has a chance to 838 /// execute. The call to `block_on` will panic due to the runtime being 839 /// forcibly shutdown. 840 /// 841 /// ```should_panic 842 /// use tokio::runtime::{self, UnhandledPanic}; 843 /// 844 /// # pub fn main() { 845 /// let rt = runtime::Builder::new_current_thread() 846 /// .unhandled_panic(UnhandledPanic::ShutdownRuntime) 847 /// .build() 848 /// .unwrap(); 849 /// 850 /// rt.spawn(async { panic!("boom"); }); 851 /// rt.spawn(async { 852 /// // This task never completes. 853 /// }); 854 /// 855 /// rt.block_on(async { 856 /// // Do some work 857 /// # loop { tokio::task::yield_now().await; } 858 /// }) 859 /// # } 860 /// ``` 861 /// 862 /// [`JoinHandle`]: struct@crate::task::JoinHandle 863 pub fn unhandled_panic(&mut self, behavior: UnhandledPanic) -> &mut Self { 864 self.unhandled_panic = behavior; 865 self 866 } 867 868 /// Disables the LIFO task scheduler heuristic. 869 /// 870 /// The multi-threaded scheduler includes a heuristic for optimizing 871 /// message-passing patterns. This heuristic results in the **last** 872 /// scheduled task being polled first. 873 /// 874 /// To implement this heuristic, each worker thread has a slot which 875 /// holds the task that should be polled next. However, this slot cannot 876 /// be stolen by other worker threads, which can result in lower total 877 /// throughput when tasks tend to have longer poll times. 878 /// 879 /// This configuration option will disable this heuristic resulting in 880 /// all scheduled tasks being pushed into the worker-local queue, which 881 /// is stealable. 882 /// 883 /// Consider trying this option when the task "scheduled" time is high 884 /// but the runtime is underutilized. Use tokio-rs/tokio-metrics to 885 /// collect this data. 886 /// 887 /// # Unstable 888 /// 889 /// This configuration option is considered a workaround for the LIFO 890 /// slot not being stealable. When the slot becomes stealable, we will 891 /// revisit whether or not this option is necessary. See 892 /// tokio-rs/tokio#4941. 893 /// 894 /// # Examples 895 /// 896 /// ``` 897 /// use tokio::runtime; 898 /// 899 /// let rt = runtime::Builder::new_multi_thread() 900 /// .disable_lifo_slot() 901 /// .build() 902 /// .unwrap(); 903 /// ``` 904 pub fn disable_lifo_slot(&mut self) -> &mut Self { 905 self.disable_lifo_slot = true; 906 self 907 } 908 909 /// Specifies the random number generation seed to use within all 910 /// threads associated with the runtime being built. 911 /// 912 /// This option is intended to make certain parts of the runtime 913 /// deterministic (e.g. the [`tokio::select!`] macro). In the case of 914 /// [`tokio::select!`] it will ensure that the order that branches are 915 /// polled is deterministic. 916 /// 917 /// In addition to the code specifying `rng_seed` and interacting with 918 /// the runtime, the internals of Tokio and the Rust compiler may affect 919 /// the sequences of random numbers. In order to ensure repeatable 920 /// results, the version of Tokio, the versions of all other 921 /// dependencies that interact with Tokio, and the Rust compiler version 922 /// should also all remain constant. 923 /// 924 /// # Examples 925 /// 926 /// ``` 927 /// # use tokio::runtime::{self, RngSeed}; 928 /// # pub fn main() { 929 /// let seed = RngSeed::from_bytes(b"place your seed here"); 930 /// let rt = runtime::Builder::new_current_thread() 931 /// .rng_seed(seed) 932 /// .build(); 933 /// # } 934 /// ``` 935 /// 936 /// [`tokio::select!`]: crate::select 937 pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self { 938 self.seed_generator = RngSeedGenerator::new(seed); 939 self 940 } 941 } 942 943 cfg_metrics! { 944 /// Enables tracking the distribution of task poll times. 945 /// 946 /// Task poll times are not instrumented by default as doing so requires 947 /// calling [`Instant::now()`] twice per task poll, which could add 948 /// measurable overhead. Use the [`Handle::metrics()`] to access the 949 /// metrics data. 950 /// 951 /// The histogram uses fixed bucket sizes. In other words, the histogram 952 /// buckets are not dynamic based on input values. Use the 953 /// `metrics_poll_count_histogram_` builder methods to configure the 954 /// histogram details. 955 /// 956 /// # Examples 957 /// 958 /// ``` 959 /// use tokio::runtime; 960 /// 961 /// let rt = runtime::Builder::new_multi_thread() 962 /// .enable_metrics_poll_count_histogram() 963 /// .build() 964 /// .unwrap(); 965 /// # // Test default values here 966 /// # fn us(n: u64) -> std::time::Duration { std::time::Duration::from_micros(n) } 967 /// # let m = rt.handle().metrics(); 968 /// # assert_eq!(m.poll_count_histogram_num_buckets(), 10); 969 /// # assert_eq!(m.poll_count_histogram_bucket_range(0), us(0)..us(100)); 970 /// # assert_eq!(m.poll_count_histogram_bucket_range(1), us(100)..us(200)); 971 /// ``` 972 /// 973 /// [`Handle::metrics()`]: crate::runtime::Handle::metrics 974 /// [`Instant::now()`]: std::time::Instant::now 975 pub fn enable_metrics_poll_count_histogram(&mut self) -> &mut Self { 976 self.metrics_poll_count_histogram_enable = true; 977 self 978 } 979 980 /// Sets the histogram scale for tracking the distribution of task poll 981 /// times. 982 /// 983 /// Tracking the distribution of task poll times can be done using a 984 /// linear or log scale. When using linear scale, each histogram bucket 985 /// will represent the same range of poll times. When using log scale, 986 /// each histogram bucket will cover a range twice as big as the 987 /// previous bucket. 988 /// 989 /// **Default:** linear scale. 990 /// 991 /// # Examples 992 /// 993 /// ``` 994 /// use tokio::runtime::{self, HistogramScale}; 995 /// 996 /// let rt = runtime::Builder::new_multi_thread() 997 /// .enable_metrics_poll_count_histogram() 998 /// .metrics_poll_count_histogram_scale(HistogramScale::Log) 999 /// .build() 1000 /// .unwrap(); 1001 /// ``` 1002 pub fn metrics_poll_count_histogram_scale(&mut self, histogram_scale: crate::runtime::HistogramScale) -> &mut Self { 1003 self.metrics_poll_count_histogram.scale = histogram_scale; 1004 self 1005 } 1006 1007 /// Sets the histogram resolution for tracking the distribution of task 1008 /// poll times. 1009 /// 1010 /// The resolution is the histogram's first bucket's range. When using a 1011 /// linear histogram scale, each bucket will cover the same range. When 1012 /// using a log scale, each bucket will cover a range twice as big as 1013 /// the previous bucket. In the log case, the resolution represents the 1014 /// smallest bucket range. 1015 /// 1016 /// Note that, when using log scale, the resolution is rounded up to the 1017 /// nearest power of 2 in nanoseconds. 1018 /// 1019 /// **Default:** 100 microseconds. 1020 /// 1021 /// # Examples 1022 /// 1023 /// ``` 1024 /// use tokio::runtime; 1025 /// use std::time::Duration; 1026 /// 1027 /// let rt = runtime::Builder::new_multi_thread() 1028 /// .enable_metrics_poll_count_histogram() 1029 /// .metrics_poll_count_histogram_resolution(Duration::from_micros(100)) 1030 /// .build() 1031 /// .unwrap(); 1032 /// ``` 1033 pub fn metrics_poll_count_histogram_resolution(&mut self, resolution: Duration) -> &mut Self { 1034 assert!(resolution > Duration::from_secs(0)); 1035 // Sanity check the argument and also make the cast below safe. 1036 assert!(resolution <= Duration::from_secs(1)); 1037 1038 let resolution = resolution.as_nanos() as u64; 1039 self.metrics_poll_count_histogram.resolution = resolution; 1040 self 1041 } 1042 1043 /// Sets the number of buckets for the histogram tracking the 1044 /// distribution of task poll times. 1045 /// 1046 /// The last bucket tracks all greater values that fall out of other 1047 /// ranges. So, configuring the histogram using a linear scale, 1048 /// resolution of 50ms, and 10 buckets, the 10th bucket will track task 1049 /// polls that take more than 450ms to complete. 1050 /// 1051 /// **Default:** 10 1052 /// 1053 /// # Examples 1054 /// 1055 /// ``` 1056 /// use tokio::runtime; 1057 /// 1058 /// let rt = runtime::Builder::new_multi_thread() 1059 /// .enable_metrics_poll_count_histogram() 1060 /// .metrics_poll_count_histogram_buckets(15) 1061 /// .build() 1062 /// .unwrap(); 1063 /// ``` 1064 pub fn metrics_poll_count_histogram_buckets(&mut self, buckets: usize) -> &mut Self { 1065 self.metrics_poll_count_histogram.num_buckets = buckets; 1066 self 1067 } 1068 } 1069 1070 cfg_loom! { 1071 pub(crate) fn local_queue_capacity(&mut self, value: usize) -> &mut Self { 1072 assert!(value.is_power_of_two()); 1073 self.local_queue_capacity = value; 1074 self 1075 } 1076 } 1077 build_current_thread_runtime(&mut self) -> io::Result<Runtime>1078 fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> { 1079 use crate::runtime::scheduler::{self, CurrentThread}; 1080 use crate::runtime::{runtime::Scheduler, Config}; 1081 1082 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; 1083 1084 // Blocking pool 1085 let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); 1086 let blocking_spawner = blocking_pool.spawner().clone(); 1087 1088 // Generate a rng seed for this runtime. 1089 let seed_generator_1 = self.seed_generator.next_generator(); 1090 let seed_generator_2 = self.seed_generator.next_generator(); 1091 1092 // And now put a single-threaded scheduler on top of the timer. When 1093 // there are no futures ready to do something, it'll let the timer or 1094 // the reactor to generate some new stimuli for the futures to continue 1095 // in their life. 1096 let (scheduler, handle) = CurrentThread::new( 1097 driver, 1098 driver_handle, 1099 blocking_spawner, 1100 seed_generator_2, 1101 Config { 1102 before_park: self.before_park.clone(), 1103 after_unpark: self.after_unpark.clone(), 1104 global_queue_interval: self.global_queue_interval, 1105 event_interval: self.event_interval, 1106 local_queue_capacity: self.local_queue_capacity, 1107 #[cfg(tokio_unstable)] 1108 unhandled_panic: self.unhandled_panic.clone(), 1109 disable_lifo_slot: self.disable_lifo_slot, 1110 seed_generator: seed_generator_1, 1111 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), 1112 }, 1113 ); 1114 1115 let handle = Handle { 1116 inner: scheduler::Handle::CurrentThread(handle), 1117 }; 1118 1119 Ok(Runtime::from_parts( 1120 Scheduler::CurrentThread(scheduler), 1121 handle, 1122 blocking_pool, 1123 )) 1124 } 1125 metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder>1126 fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> { 1127 if self.metrics_poll_count_histogram_enable { 1128 Some(self.metrics_poll_count_histogram.clone()) 1129 } else { 1130 None 1131 } 1132 } 1133 } 1134 1135 cfg_io_driver! { 1136 impl Builder { 1137 /// Enables the I/O driver. 1138 /// 1139 /// Doing this enables using net, process, signal, and some I/O types on 1140 /// the runtime. 1141 /// 1142 /// # Examples 1143 /// 1144 /// ``` 1145 /// use tokio::runtime; 1146 /// 1147 /// let rt = runtime::Builder::new_multi_thread() 1148 /// .enable_io() 1149 /// .build() 1150 /// .unwrap(); 1151 /// ``` 1152 pub fn enable_io(&mut self) -> &mut Self { 1153 self.enable_io = true; 1154 self 1155 } 1156 1157 /// Enables the I/O driver and configures the max number of events to be 1158 /// processed per tick. 1159 /// 1160 /// # Examples 1161 /// 1162 /// ``` 1163 /// use tokio::runtime; 1164 /// 1165 /// let rt = runtime::Builder::new_current_thread() 1166 /// .enable_io() 1167 /// .max_io_events_per_tick(1024) 1168 /// .build() 1169 /// .unwrap(); 1170 /// ``` 1171 pub fn max_io_events_per_tick(&mut self, capacity: usize) -> &mut Self { 1172 self.nevents = capacity; 1173 self 1174 } 1175 } 1176 } 1177 1178 cfg_time! { 1179 impl Builder { 1180 /// Enables the time driver. 1181 /// 1182 /// Doing this enables using `tokio::time` on the runtime. 1183 /// 1184 /// # Examples 1185 /// 1186 /// ``` 1187 /// use tokio::runtime; 1188 /// 1189 /// let rt = runtime::Builder::new_multi_thread() 1190 /// .enable_time() 1191 /// .build() 1192 /// .unwrap(); 1193 /// ``` 1194 pub fn enable_time(&mut self) -> &mut Self { 1195 self.enable_time = true; 1196 self 1197 } 1198 } 1199 } 1200 1201 cfg_test_util! { 1202 impl Builder { 1203 /// Controls if the runtime's clock starts paused or advancing. 1204 /// 1205 /// Pausing time requires the current-thread runtime; construction of 1206 /// the runtime will panic otherwise. 1207 /// 1208 /// # Examples 1209 /// 1210 /// ``` 1211 /// use tokio::runtime; 1212 /// 1213 /// let rt = runtime::Builder::new_current_thread() 1214 /// .enable_time() 1215 /// .start_paused(true) 1216 /// .build() 1217 /// .unwrap(); 1218 /// ``` 1219 pub fn start_paused(&mut self, start_paused: bool) -> &mut Self { 1220 self.start_paused = start_paused; 1221 self 1222 } 1223 } 1224 } 1225 1226 cfg_rt_multi_thread! { 1227 impl Builder { 1228 fn build_threaded_runtime(&mut self) -> io::Result<Runtime> { 1229 use crate::loom::sys::num_cpus; 1230 use crate::runtime::{Config, runtime::Scheduler}; 1231 use crate::runtime::scheduler::{self, MultiThread}; 1232 1233 let core_threads = self.worker_threads.unwrap_or_else(num_cpus); 1234 1235 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; 1236 1237 // Create the blocking pool 1238 let blocking_pool = 1239 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); 1240 let blocking_spawner = blocking_pool.spawner().clone(); 1241 1242 // Generate a rng seed for this runtime. 1243 let seed_generator_1 = self.seed_generator.next_generator(); 1244 let seed_generator_2 = self.seed_generator.next_generator(); 1245 1246 let (scheduler, handle, launch) = MultiThread::new( 1247 core_threads, 1248 driver, 1249 driver_handle, 1250 blocking_spawner, 1251 seed_generator_2, 1252 Config { 1253 before_park: self.before_park.clone(), 1254 after_unpark: self.after_unpark.clone(), 1255 global_queue_interval: self.global_queue_interval, 1256 event_interval: self.event_interval, 1257 local_queue_capacity: self.local_queue_capacity, 1258 #[cfg(tokio_unstable)] 1259 unhandled_panic: self.unhandled_panic.clone(), 1260 disable_lifo_slot: self.disable_lifo_slot, 1261 seed_generator: seed_generator_1, 1262 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), 1263 }, 1264 ); 1265 1266 let handle = Handle { inner: scheduler::Handle::MultiThread(handle) }; 1267 1268 // Spawn the thread pool workers 1269 let _enter = handle.enter(); 1270 launch.launch(); 1271 1272 Ok(Runtime::from_parts(Scheduler::MultiThread(scheduler), handle, blocking_pool)) 1273 } 1274 1275 cfg_unstable! { 1276 fn build_alt_threaded_runtime(&mut self) -> io::Result<Runtime> { 1277 use crate::loom::sys::num_cpus; 1278 use crate::runtime::{Config, runtime::Scheduler}; 1279 use crate::runtime::scheduler::MultiThreadAlt; 1280 1281 let core_threads = self.worker_threads.unwrap_or_else(num_cpus); 1282 1283 let (driver, driver_handle) = driver::Driver::new(self.get_cfg())?; 1284 1285 // Create the blocking pool 1286 let blocking_pool = 1287 blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); 1288 let blocking_spawner = blocking_pool.spawner().clone(); 1289 1290 // Generate a rng seed for this runtime. 1291 let seed_generator_1 = self.seed_generator.next_generator(); 1292 let seed_generator_2 = self.seed_generator.next_generator(); 1293 1294 let (scheduler, handle) = MultiThreadAlt::new( 1295 core_threads, 1296 driver, 1297 driver_handle, 1298 blocking_spawner, 1299 seed_generator_2, 1300 Config { 1301 before_park: self.before_park.clone(), 1302 after_unpark: self.after_unpark.clone(), 1303 global_queue_interval: self.global_queue_interval, 1304 event_interval: self.event_interval, 1305 local_queue_capacity: self.local_queue_capacity, 1306 #[cfg(tokio_unstable)] 1307 unhandled_panic: self.unhandled_panic.clone(), 1308 disable_lifo_slot: self.disable_lifo_slot, 1309 seed_generator: seed_generator_1, 1310 metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(), 1311 }, 1312 ); 1313 1314 Ok(Runtime::from_parts(Scheduler::MultiThreadAlt(scheduler), handle, blocking_pool)) 1315 } 1316 } 1317 } 1318 } 1319 1320 impl fmt::Debug for Builder { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result1321 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 1322 fmt.debug_struct("Builder") 1323 .field("worker_threads", &self.worker_threads) 1324 .field("max_blocking_threads", &self.max_blocking_threads) 1325 .field( 1326 "thread_name", 1327 &"<dyn Fn() -> String + Send + Sync + 'static>", 1328 ) 1329 .field("thread_stack_size", &self.thread_stack_size) 1330 .field("after_start", &self.after_start.as_ref().map(|_| "...")) 1331 .field("before_stop", &self.before_stop.as_ref().map(|_| "...")) 1332 .field("before_park", &self.before_park.as_ref().map(|_| "...")) 1333 .field("after_unpark", &self.after_unpark.as_ref().map(|_| "...")) 1334 .finish() 1335 } 1336 } 1337