• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! The Tokio runtime.
2 //!
3 //! Unlike other Rust programs, asynchronous applications require runtime
4 //! support. In particular, the following runtime services are necessary:
5 //!
6 //! * An **I/O event loop**, called the driver, which drives I/O resources and
7 //!   dispatches I/O events to tasks that depend on them.
8 //! * A **scheduler** to execute [tasks] that use these I/O resources.
9 //! * A **timer** for scheduling work to run after a set period of time.
10 //!
11 //! Tokio's [`Runtime`] bundles all of these services as a single type, allowing
12 //! them to be started, shut down, and configured together. However, often it is
13 //! not required to configure a [`Runtime`] manually, and a user may just use the
14 //! [`tokio::main`] attribute macro, which creates a [`Runtime`] under the hood.
15 //!
16 //! # Usage
17 //!
18 //! When no fine tuning is required, the [`tokio::main`] attribute macro can be
19 //! used.
20 //!
21 //! ```no_run
22 //! use tokio::net::TcpListener;
23 //! use tokio::io::{AsyncReadExt, AsyncWriteExt};
24 //!
25 //! #[tokio::main]
26 //! async fn main() -> Result<(), Box<dyn std::error::Error>> {
27 //!     let listener = TcpListener::bind("127.0.0.1:8080").await?;
28 //!
29 //!     loop {
30 //!         let (mut socket, _) = listener.accept().await?;
31 //!
32 //!         tokio::spawn(async move {
33 //!             let mut buf = [0; 1024];
34 //!
35 //!             // In a loop, read data from the socket and write the data back.
36 //!             loop {
37 //!                 let n = match socket.read(&mut buf).await {
38 //!                     // socket closed
39 //!                     Ok(n) if n == 0 => return,
40 //!                     Ok(n) => n,
41 //!                     Err(e) => {
42 //!                         println!("failed to read from socket; err = {:?}", e);
43 //!                         return;
44 //!                     }
45 //!                 };
46 //!
47 //!                 // Write the data back
48 //!                 if let Err(e) = socket.write_all(&buf[0..n]).await {
49 //!                     println!("failed to write to socket; err = {:?}", e);
50 //!                     return;
51 //!                 }
52 //!             }
53 //!         });
54 //!     }
55 //! }
56 //! ```
57 //!
58 //! From within the context of the runtime, additional tasks are spawned using
59 //! the [`tokio::spawn`] function. Futures spawned using this function will be
60 //! executed on the same thread pool used by the [`Runtime`].
61 //!
62 //! A [`Runtime`] instance can also be used directly.
63 //!
64 //! ```no_run
65 //! use tokio::net::TcpListener;
66 //! use tokio::io::{AsyncReadExt, AsyncWriteExt};
67 //! use tokio::runtime::Runtime;
68 //!
69 //! fn main() -> Result<(), Box<dyn std::error::Error>> {
70 //!     // Create the runtime
71 //!     let rt  = Runtime::new()?;
72 //!
73 //!     // Spawn the root task
74 //!     rt.block_on(async {
75 //!         let listener = TcpListener::bind("127.0.0.1:8080").await?;
76 //!
77 //!         loop {
78 //!             let (mut socket, _) = listener.accept().await?;
79 //!
80 //!             tokio::spawn(async move {
81 //!                 let mut buf = [0; 1024];
82 //!
83 //!                 // In a loop, read data from the socket and write the data back.
84 //!                 loop {
85 //!                     let n = match socket.read(&mut buf).await {
86 //!                         // socket closed
87 //!                         Ok(n) if n == 0 => return,
88 //!                         Ok(n) => n,
89 //!                         Err(e) => {
90 //!                             println!("failed to read from socket; err = {:?}", e);
91 //!                             return;
92 //!                         }
93 //!                     };
94 //!
95 //!                     // Write the data back
96 //!                     if let Err(e) = socket.write_all(&buf[0..n]).await {
97 //!                         println!("failed to write to socket; err = {:?}", e);
98 //!                         return;
99 //!                     }
100 //!                 }
101 //!             });
102 //!         }
103 //!     })
104 //! }
105 //! ```
106 //!
107 //! ## Runtime Configurations
108 //!
109 //! Tokio provides multiple task scheduling strategies, suitable for different
110 //! applications. The [runtime builder] or `#[tokio::main]` attribute may be
111 //! used to select which scheduler to use.
112 //!
113 //! #### Multi-Thread Scheduler
114 //!
115 //! The multi-thread scheduler executes futures on a _thread pool_, using a
116 //! work-stealing strategy. By default, it will start a worker thread for each
117 //! CPU core available on the system. This tends to be the ideal configuration
118 //! for most applications. The multi-thread scheduler requires the `rt-multi-thread`
119 //! feature flag, and is selected by default:
120 //! ```
121 //! use tokio::runtime;
122 //!
123 //! # fn main() -> Result<(), Box<dyn std::error::Error>> {
124 //! let threaded_rt = runtime::Runtime::new()?;
125 //! # Ok(()) }
126 //! ```
127 //!
128 //! Most applications should use the multi-thread scheduler, except in some
129 //! niche use-cases, such as when running only a single thread is required.
130 //!
131 //! #### Current-Thread Scheduler
132 //!
133 //! The current-thread scheduler provides a _single-threaded_ future executor.
134 //! All tasks will be created and executed on the current thread. This requires
135 //! the `rt` feature flag.
136 //! ```
137 //! use tokio::runtime;
138 //!
139 //! # fn main() -> Result<(), Box<dyn std::error::Error>> {
140 //! let basic_rt = runtime::Builder::new_current_thread()
141 //!     .build()?;
142 //! # Ok(()) }
143 //! ```
144 //!
145 //! #### Resource drivers
146 //!
147 //! When configuring a runtime by hand, no resource drivers are enabled by
148 //! default. In this case, attempting to use networking types or time types will
149 //! fail. In order to enable these types, the resource drivers must be enabled.
150 //! This is done with [`Builder::enable_io`] and [`Builder::enable_time`]. As a
151 //! shorthand, [`Builder::enable_all`] enables both resource drivers.
152 //!
153 //! ## Lifetime of spawned threads
154 //!
155 //! The runtime may spawn threads depending on its configuration and usage. The
156 //! multi-thread scheduler spawns threads to schedule tasks and for `spawn_blocking`
157 //! calls.
158 //!
159 //! While the `Runtime` is active, threads may shutdown after periods of being
160 //! idle. Once `Runtime` is dropped, all runtime threads are forcibly shutdown.
161 //! Any tasks that have not yet completed will be dropped.
162 //!
163 //! [tasks]: crate::task
164 //! [`Runtime`]: Runtime
165 //! [`tokio::spawn`]: crate::spawn
166 //! [`tokio::main`]: ../attr.main.html
167 //! [runtime builder]: crate::runtime::Builder
168 //! [`Runtime::new`]: crate::runtime::Runtime::new
169 //! [`Builder::basic_scheduler`]: crate::runtime::Builder::basic_scheduler
170 //! [`Builder::threaded_scheduler`]: crate::runtime::Builder::threaded_scheduler
171 //! [`Builder::enable_io`]: crate::runtime::Builder::enable_io
172 //! [`Builder::enable_time`]: crate::runtime::Builder::enable_time
173 //! [`Builder::enable_all`]: crate::runtime::Builder::enable_all
174 
175 // At the top due to macros
176 #[cfg(test)]
177 #[macro_use]
178 mod tests;
179 
180 pub(crate) mod enter;
181 
182 pub(crate) mod task;
183 
184 cfg_stats! {
185     pub mod stats;
186 }
187 cfg_not_stats! {
188     pub(crate) mod stats;
189 }
190 
191 cfg_rt! {
192     mod basic_scheduler;
193     use basic_scheduler::BasicScheduler;
194 
195     mod blocking;
196     use blocking::BlockingPool;
197     pub(crate) use blocking::spawn_blocking;
198 
199     mod builder;
200     pub use self::builder::Builder;
201 
202     pub(crate) mod context;
203     pub(crate) mod driver;
204 
205     use self::enter::enter;
206 
207     mod handle;
208     pub use handle::{EnterGuard, Handle, TryCurrentError};
209 
210     mod spawner;
211     use self::spawner::Spawner;
212 }
213 
214 cfg_rt_multi_thread! {
215     mod park;
216     use park::Parker;
217 }
218 
219 cfg_rt_multi_thread! {
220     mod queue;
221 
222     pub(crate) mod thread_pool;
223     use self::thread_pool::ThreadPool;
224 }
225 
226 cfg_rt! {
227     use crate::task::JoinHandle;
228 
229     use std::future::Future;
230     use std::time::Duration;
231 
232     /// The Tokio runtime.
233     ///
234     /// The runtime provides an I/O driver, task scheduler, [timer], and
235     /// blocking pool, necessary for running asynchronous tasks.
236     ///
237     /// Instances of `Runtime` can be created using [`new`], or [`Builder`].
238     /// However, most users will use the `#[tokio::main]` annotation on their
239     /// entry point instead.
240     ///
241     /// See [module level][mod] documentation for more details.
242     ///
243     /// # Shutdown
244     ///
245     /// Shutting down the runtime is done by dropping the value. The current
246     /// thread will block until the shut down operation has completed.
247     ///
248     /// * Drain any scheduled work queues.
249     /// * Drop any futures that have not yet completed.
250     /// * Drop the reactor.
251     ///
252     /// Once the reactor has dropped, any outstanding I/O resources bound to
253     /// that reactor will no longer function. Calling any method on them will
254     /// result in an error.
255     ///
256     /// # Sharing
257     ///
258     /// The Tokio runtime implements `Sync` and `Send` to allow you to wrap it
259     /// in a `Arc`. Most fn take `&self` to allow you to call them concurrently
260     /// across multiple threads.
261     ///
262     /// Calls to `shutdown` and `shutdown_timeout` require exclusive ownership of
263     /// the runtime type and this can be achieved via `Arc::try_unwrap` when only
264     /// one strong count reference is left over.
265     ///
266     /// [timer]: crate::time
267     /// [mod]: index.html
268     /// [`new`]: method@Self::new
269     /// [`Builder`]: struct@Builder
270     #[derive(Debug)]
271     pub struct Runtime {
272         /// Task executor
273         kind: Kind,
274 
275         /// Handle to runtime, also contains driver handles
276         handle: Handle,
277 
278         /// Blocking pool handle, used to signal shutdown
279         blocking_pool: BlockingPool,
280     }
281 
282     /// The runtime executor is either a thread-pool or a current-thread executor.
283     #[derive(Debug)]
284     enum Kind {
285         /// Execute all tasks on the current-thread.
286         CurrentThread(BasicScheduler<driver::Driver>),
287 
288         /// Execute tasks across multiple threads.
289         #[cfg(feature = "rt-multi-thread")]
290         ThreadPool(ThreadPool),
291     }
292 
293     /// After thread starts / before thread stops
294     type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;
295 
296     impl Runtime {
297         /// Creates a new runtime instance with default configuration values.
298         ///
299         /// This results in the multi threaded scheduler, I/O driver, and time driver being
300         /// initialized.
301         ///
302         /// Most applications will not need to call this function directly. Instead,
303         /// they will use the  [`#[tokio::main]` attribute][main]. When a more complex
304         /// configuration is necessary, the [runtime builder] may be used.
305         ///
306         /// See [module level][mod] documentation for more details.
307         ///
308         /// # Examples
309         ///
310         /// Creating a new `Runtime` with default configuration values.
311         ///
312         /// ```
313         /// use tokio::runtime::Runtime;
314         ///
315         /// let rt = Runtime::new()
316         ///     .unwrap();
317         ///
318         /// // Use the runtime...
319         /// ```
320         ///
321         /// [mod]: index.html
322         /// [main]: ../attr.main.html
323         /// [threaded scheduler]: index.html#threaded-scheduler
324         /// [basic scheduler]: index.html#basic-scheduler
325         /// [runtime builder]: crate::runtime::Builder
326         #[cfg(feature = "rt-multi-thread")]
327         #[cfg_attr(docsrs, doc(cfg(feature = "rt-multi-thread")))]
328         pub fn new() -> std::io::Result<Runtime> {
329             Builder::new_multi_thread().enable_all().build()
330         }
331 
332         /// Returns a handle to the runtime's spawner.
333         ///
334         /// The returned handle can be used to spawn tasks that run on this runtime, and can
335         /// be cloned to allow moving the `Handle` to other threads.
336         ///
337         /// # Examples
338         ///
339         /// ```
340         /// use tokio::runtime::Runtime;
341         ///
342         /// let rt = Runtime::new()
343         ///     .unwrap();
344         ///
345         /// let handle = rt.handle();
346         ///
347         /// // Use the handle...
348         /// ```
349         pub fn handle(&self) -> &Handle {
350             &self.handle
351         }
352 
353         /// Spawns a future onto the Tokio runtime.
354         ///
355         /// This spawns the given future onto the runtime's executor, usually a
356         /// thread pool. The thread pool is then responsible for polling the future
357         /// until it completes.
358         ///
359         /// See [module level][mod] documentation for more details.
360         ///
361         /// [mod]: index.html
362         ///
363         /// # Examples
364         ///
365         /// ```
366         /// use tokio::runtime::Runtime;
367         ///
368         /// # fn dox() {
369         /// // Create the runtime
370         /// let rt = Runtime::new().unwrap();
371         ///
372         /// // Spawn a future onto the runtime
373         /// rt.spawn(async {
374         ///     println!("now running on a worker thread");
375         /// });
376         /// # }
377         /// ```
378         #[cfg_attr(tokio_track_caller, track_caller)]
379         pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
380         where
381             F: Future + Send + 'static,
382             F::Output: Send + 'static,
383         {
384             self.handle.spawn(future)
385         }
386 
387         /// Runs the provided function on an executor dedicated to blocking operations.
388         ///
389         /// # Examples
390         ///
391         /// ```
392         /// use tokio::runtime::Runtime;
393         ///
394         /// # fn dox() {
395         /// // Create the runtime
396         /// let rt = Runtime::new().unwrap();
397         ///
398         /// // Spawn a blocking function onto the runtime
399         /// rt.spawn_blocking(|| {
400         ///     println!("now running on a worker thread");
401         /// });
402         /// # }
403         #[cfg_attr(tokio_track_caller, track_caller)]
404         pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
405         where
406             F: FnOnce() -> R + Send + 'static,
407             R: Send + 'static,
408         {
409             self.handle.spawn_blocking(func)
410         }
411 
412         /// Runs a future to completion on the Tokio runtime. This is the
413         /// runtime's entry point.
414         ///
415         /// This runs the given future on the current thread, blocking until it is
416         /// complete, and yielding its resolved result. Any tasks or timers
417         /// which the future spawns internally will be executed on the runtime.
418         ///
419         /// # Multi thread scheduler
420         ///
421         /// When the multi thread scheduler is used this will allow futures
422         /// to run within the io driver and timer context of the overall runtime.
423         ///
424         /// # Current thread scheduler
425         ///
426         /// When the current thread scheduler is enabled `block_on`
427         /// can be called concurrently from multiple threads. The first call
428         /// will take ownership of the io and timer drivers. This means
429         /// other threads which do not own the drivers will hook into that one.
430         /// When the first `block_on` completes, other threads will be able to
431         /// "steal" the driver to allow continued execution of their futures.
432         ///
433         /// # Panics
434         ///
435         /// This function panics if the provided future panics, or if called within an
436         /// asynchronous execution context.
437         ///
438         /// # Examples
439         ///
440         /// ```no_run
441         /// use tokio::runtime::Runtime;
442         ///
443         /// // Create the runtime
444         /// let rt  = Runtime::new().unwrap();
445         ///
446         /// // Execute the future, blocking the current thread until completion
447         /// rt.block_on(async {
448         ///     println!("hello");
449         /// });
450         /// ```
451         ///
452         /// [handle]: fn@Handle::block_on
453         #[cfg_attr(tokio_track_caller, track_caller)]
454         pub fn block_on<F: Future>(&self, future: F) -> F::Output {
455             #[cfg(all(tokio_unstable, feature = "tracing"))]
456             let future = crate::util::trace::task(future, "block_on", None);
457 
458             let _enter = self.enter();
459 
460             match &self.kind {
461                 Kind::CurrentThread(exec) => exec.block_on(future),
462                 #[cfg(feature = "rt-multi-thread")]
463                 Kind::ThreadPool(exec) => exec.block_on(future),
464             }
465         }
466 
467         /// Enters the runtime context.
468         ///
469         /// This allows you to construct types that must have an executor
470         /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
471         /// also allow you to call methods such as [`tokio::spawn`].
472         ///
473         /// [`Sleep`]: struct@crate::time::Sleep
474         /// [`TcpStream`]: struct@crate::net::TcpStream
475         /// [`tokio::spawn`]: fn@crate::spawn
476         ///
477         /// # Example
478         ///
479         /// ```
480         /// use tokio::runtime::Runtime;
481         ///
482         /// fn function_that_spawns(msg: String) {
483         ///     // Had we not used `rt.enter` below, this would panic.
484         ///     tokio::spawn(async move {
485         ///         println!("{}", msg);
486         ///     });
487         /// }
488         ///
489         /// fn main() {
490         ///     let rt = Runtime::new().unwrap();
491         ///
492         ///     let s = "Hello World!".to_string();
493         ///
494         ///     // By entering the context, we tie `tokio::spawn` to this executor.
495         ///     let _guard = rt.enter();
496         ///     function_that_spawns(s);
497         /// }
498         /// ```
499         pub fn enter(&self) -> EnterGuard<'_> {
500             self.handle.enter()
501         }
502 
503         /// Shuts down the runtime, waiting for at most `duration` for all spawned
504         /// task to shutdown.
505         ///
506         /// Usually, dropping a `Runtime` handle is sufficient as tasks are able to
507         /// shutdown in a timely fashion. However, dropping a `Runtime` will wait
508         /// indefinitely for all tasks to terminate, and there are cases where a long
509         /// blocking task has been spawned, which can block dropping `Runtime`.
510         ///
511         /// In this case, calling `shutdown_timeout` with an explicit wait timeout
512         /// can work. The `shutdown_timeout` will signal all tasks to shutdown and
513         /// will wait for at most `duration` for all spawned tasks to terminate. If
514         /// `timeout` elapses before all tasks are dropped, the function returns and
515         /// outstanding tasks are potentially leaked.
516         ///
517         /// # Examples
518         ///
519         /// ```
520         /// use tokio::runtime::Runtime;
521         /// use tokio::task;
522         ///
523         /// use std::thread;
524         /// use std::time::Duration;
525         ///
526         /// fn main() {
527         ///    let runtime = Runtime::new().unwrap();
528         ///
529         ///    runtime.block_on(async move {
530         ///        task::spawn_blocking(move || {
531         ///            thread::sleep(Duration::from_secs(10_000));
532         ///        });
533         ///    });
534         ///
535         ///    runtime.shutdown_timeout(Duration::from_millis(100));
536         /// }
537         /// ```
538         pub fn shutdown_timeout(mut self, duration: Duration) {
539             // Wakeup and shutdown all the worker threads
540             self.handle.clone().shutdown();
541             self.blocking_pool.shutdown(Some(duration));
542         }
543 
544         /// Shuts down the runtime, without waiting for any spawned tasks to shutdown.
545         ///
546         /// This can be useful if you want to drop a runtime from within another runtime.
547         /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
548         /// to complete, which would normally not be permitted within an asynchronous context.
549         /// By calling `shutdown_background()`, you can drop the runtime from such a context.
550         ///
551         /// Note however, that because we do not wait for any blocking tasks to complete, this
552         /// may result in a resource leak (in that any blocking tasks are still running until they
553         /// return.
554         ///
555         /// This function is equivalent to calling `shutdown_timeout(Duration::of_nanos(0))`.
556         ///
557         /// ```
558         /// use tokio::runtime::Runtime;
559         ///
560         /// fn main() {
561         ///    let runtime = Runtime::new().unwrap();
562         ///
563         ///    runtime.block_on(async move {
564         ///        let inner_runtime = Runtime::new().unwrap();
565         ///        // ...
566         ///        inner_runtime.shutdown_background();
567         ///    });
568         /// }
569         /// ```
570         pub fn shutdown_background(self) {
571             self.shutdown_timeout(Duration::from_nanos(0))
572         }
573     }
574 
575     #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let
576     impl Drop for Runtime {
577         fn drop(&mut self) {
578             match &mut self.kind {
579                 Kind::CurrentThread(basic) => {
580                     // This ensures that tasks spawned on the basic runtime are dropped inside the
581                     // runtime's context.
582                     match self::context::try_enter(self.handle.clone()) {
583                         Some(guard) => basic.set_context_guard(guard),
584                         None => {
585                             // The context thread-local has alread been destroyed.
586                             //
587                             // We don't set the guard in this case. Calls to tokio::spawn in task
588                             // destructors would fail regardless if this happens.
589                         },
590                     }
591                 },
592                 #[cfg(feature = "rt-multi-thread")]
593                 Kind::ThreadPool(_) => {
594                     // The threaded scheduler drops its tasks on its worker threads, which is
595                     // already in the runtime's context.
596                 },
597             }
598         }
599     }
600 }
601