• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![allow(irrefutable_let_patterns)]
2 
3 use crate::runtime::blocking::BlockingPool;
4 use crate::runtime::scheduler::CurrentThread;
5 use crate::runtime::{context, Builder, EnterGuard, Handle, BOX_FUTURE_THRESHOLD};
6 use crate::task::JoinHandle;
7 
8 use crate::util::trace::SpawnMeta;
9 use std::future::Future;
10 use std::marker::PhantomData;
11 use std::mem;
12 use std::time::Duration;
13 
14 /// A local Tokio runtime.
15 ///
16 /// This runtime is capable of driving tasks which are not `Send + Sync` without the use of a
17 /// `LocalSet`, and thus supports `spawn_local` without the need for a `LocalSet` context.
18 ///
19 /// This runtime cannot be moved between threads or driven from different threads.
20 ///
21 /// This runtime is incompatible with `LocalSet`. You should not attempt to drive a `LocalSet` within a
22 /// `LocalRuntime`.
23 ///
24 /// Currently, this runtime supports one flavor, which is internally identical to `current_thread`,
25 /// save for the aforementioned differences related to `spawn_local`.
26 ///
27 /// For more general information on how to use runtimes, see the [module] docs.
28 ///
29 /// [runtime]: crate::runtime::Runtime
30 /// [module]: crate::runtime
31 #[derive(Debug)]
32 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
33 pub struct LocalRuntime {
34     /// Task scheduler
35     scheduler: LocalRuntimeScheduler,
36 
37     /// Handle to runtime, also contains driver handles
38     handle: Handle,
39 
40     /// Blocking pool handle, used to signal shutdown
41     blocking_pool: BlockingPool,
42 
43     /// Marker used to make this !Send and !Sync.
44     _phantom: PhantomData<*mut u8>,
45 }
46 
47 /// The runtime scheduler is always a `current_thread` scheduler right now.
48 #[derive(Debug)]
49 pub(crate) enum LocalRuntimeScheduler {
50     /// Execute all tasks on the current-thread.
51     CurrentThread(CurrentThread),
52 }
53 
54 impl LocalRuntime {
from_parts( scheduler: LocalRuntimeScheduler, handle: Handle, blocking_pool: BlockingPool, ) -> LocalRuntime55     pub(crate) fn from_parts(
56         scheduler: LocalRuntimeScheduler,
57         handle: Handle,
58         blocking_pool: BlockingPool,
59     ) -> LocalRuntime {
60         LocalRuntime {
61             scheduler,
62             handle,
63             blocking_pool,
64             _phantom: Default::default(),
65         }
66     }
67 
68     /// Creates a new local runtime instance with default configuration values.
69     ///
70     /// This results in the scheduler, I/O driver, and time driver being
71     /// initialized.
72     ///
73     /// When a more complex configuration is necessary, the [runtime builder] may be used.
74     ///
75     /// See [module level][mod] documentation for more details.
76     ///
77     /// # Examples
78     ///
79     /// Creating a new `LocalRuntime` with default configuration values.
80     ///
81     /// ```
82     /// use tokio::runtime::LocalRuntime;
83     ///
84     /// let rt = LocalRuntime::new()
85     ///     .unwrap();
86     ///
87     /// // Use the runtime...
88     /// ```
89     ///
90     /// [mod]: crate::runtime
91     /// [runtime builder]: crate::runtime::Builder
new() -> std::io::Result<LocalRuntime>92     pub fn new() -> std::io::Result<LocalRuntime> {
93         Builder::new_current_thread()
94             .enable_all()
95             .build_local(&Default::default())
96     }
97 
98     /// Returns a handle to the runtime's spawner.
99     ///
100     /// The returned handle can be used to spawn tasks that run on this runtime, and can
101     /// be cloned to allow moving the `Handle` to other threads.
102     ///
103     /// As the handle can be sent to other threads, it can only be used to spawn tasks that are `Send`.
104     ///
105     /// Calling [`Handle::block_on`] on a handle to a `LocalRuntime` is error-prone.
106     /// Refer to the documentation of [`Handle::block_on`] for more.
107     ///
108     /// # Examples
109     ///
110     /// ```
111     /// use tokio::runtime::LocalRuntime;
112     ///
113     /// let rt = LocalRuntime::new()
114     ///     .unwrap();
115     ///
116     /// let handle = rt.handle();
117     ///
118     /// // Use the handle...
119     /// ```
handle(&self) -> &Handle120     pub fn handle(&self) -> &Handle {
121         &self.handle
122     }
123 
124     /// Spawns a task on the runtime.
125     ///
126     /// This is analogous to the [`spawn`] method on the standard [`Runtime`], but works even if the task is not thread-safe.
127     ///
128     /// [`spawn`]: crate::runtime::Runtime::spawn
129     /// [`Runtime`]: crate::runtime::Runtime
130     ///
131     /// # Examples
132     ///
133     /// ```
134     /// use tokio::runtime::LocalRuntime;
135     ///
136     /// # fn dox() {
137     /// // Create the runtime
138     /// let rt = LocalRuntime::new().unwrap();
139     ///
140     /// // Spawn a future onto the runtime
141     /// rt.spawn_local(async {
142     ///     println!("now running on a worker thread");
143     /// });
144     /// # }
145     /// ```
146     #[track_caller]
spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,147     pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output>
148     where
149         F: Future + 'static,
150         F::Output: 'static,
151     {
152         let fut_size = std::mem::size_of::<F>();
153         let meta = SpawnMeta::new_unnamed(fut_size);
154 
155         // safety: spawn_local can only be called from `LocalRuntime`, which this is
156         unsafe {
157             if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
158                 self.handle.spawn_local_named(Box::pin(future), meta)
159             } else {
160                 self.handle.spawn_local_named(future, meta)
161             }
162         }
163     }
164 
165     /// Runs the provided function on a thread from a dedicated blocking thread pool.
166     ///
167     /// This function _will_ be run on another thread.
168     ///
169     /// See the documentation in the non-local runtime for more information.
170     ///
171     /// [Runtime]: crate::runtime::Runtime::spawn_blocking
172     ///
173     /// # Examples
174     ///
175     /// ```
176     /// use tokio::runtime::LocalRuntime;
177     ///
178     /// # fn dox() {
179     /// // Create the runtime
180     /// let rt = LocalRuntime::new().unwrap();
181     ///
182     /// // Spawn a blocking function onto the runtime
183     /// rt.spawn_blocking(|| {
184     ///     println!("now running on a worker thread");
185     /// });
186     /// # }
187     /// ```
188     #[track_caller]
spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,189     pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
190     where
191         F: FnOnce() -> R + Send + 'static,
192         R: Send + 'static,
193     {
194         self.handle.spawn_blocking(func)
195     }
196 
197     /// Runs a future to completion on the Tokio runtime. This is the
198     /// runtime's entry point.
199     ///
200     /// See the documentation for [the equivalent method on Runtime] for more information.
201     ///
202     /// [Runtime]: crate::runtime::Runtime::block_on
203     ///
204     /// # Examples
205     ///
206     /// ```no_run
207     /// use tokio::runtime::LocalRuntime;
208     ///
209     /// // Create the runtime
210     /// let rt  = LocalRuntime::new().unwrap();
211     ///
212     /// // Execute the future, blocking the current thread until completion
213     /// rt.block_on(async {
214     ///     println!("hello");
215     /// });
216     /// ```
217     #[track_caller]
block_on<F: Future>(&self, future: F) -> F::Output218     pub fn block_on<F: Future>(&self, future: F) -> F::Output {
219         let fut_size = mem::size_of::<F>();
220         let meta = SpawnMeta::new_unnamed(fut_size);
221 
222         if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
223             self.block_on_inner(Box::pin(future), meta)
224         } else {
225             self.block_on_inner(future, meta)
226         }
227     }
228 
229     #[track_caller]
block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output230     fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
231         #[cfg(all(
232             tokio_unstable,
233             tokio_taskdump,
234             feature = "rt",
235             target_os = "linux",
236             any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
237         ))]
238         let future = crate::runtime::task::trace::Trace::root(future);
239 
240         #[cfg(all(tokio_unstable, feature = "tracing"))]
241         let future = crate::util::trace::task(
242             future,
243             "block_on",
244             _meta,
245             crate::runtime::task::Id::next().as_u64(),
246         );
247 
248         let _enter = self.enter();
249 
250         if let LocalRuntimeScheduler::CurrentThread(exec) = &self.scheduler {
251             exec.block_on(&self.handle.inner, future)
252         } else {
253             unreachable!("LocalRuntime only supports current_thread")
254         }
255     }
256 
257     /// Enters the runtime context.
258     ///
259     /// This allows you to construct types that must have an executor
260     /// available on creation such as [`Sleep`] or [`TcpStream`]. It will
261     /// also allow you to call methods such as [`tokio::spawn`].
262     ///
263     /// If this is a handle to a [`LocalRuntime`], and this function is being invoked from the same
264     /// thread that the runtime was created on, you will also be able to call
265     /// [`tokio::task::spawn_local`].
266     ///
267     /// [`Sleep`]: struct@crate::time::Sleep
268     /// [`TcpStream`]: struct@crate::net::TcpStream
269     /// [`tokio::spawn`]: fn@crate::spawn
270     /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime
271     /// [`tokio::task::spawn_local`]: fn@crate::task::spawn_local
272     ///
273     /// # Example
274     ///
275     /// ```
276     /// use tokio::runtime::LocalRuntime;
277     /// use tokio::task::JoinHandle;
278     ///
279     /// fn function_that_spawns(msg: String) -> JoinHandle<()> {
280     ///     // Had we not used `rt.enter` below, this would panic.
281     ///     tokio::spawn(async move {
282     ///         println!("{}", msg);
283     ///     })
284     /// }
285     ///
286     /// fn main() {
287     ///     let rt = LocalRuntime::new().unwrap();
288     ///
289     ///     let s = "Hello World!".to_string();
290     ///
291     ///     // By entering the context, we tie `tokio::spawn` to this executor.
292     ///     let _guard = rt.enter();
293     ///     let handle = function_that_spawns(s);
294     ///
295     ///     // Wait for the task before we end the test.
296     ///     rt.block_on(handle).unwrap();
297     /// }
298     /// ```
enter(&self) -> EnterGuard<'_>299     pub fn enter(&self) -> EnterGuard<'_> {
300         self.handle.enter()
301     }
302 
303     /// Shuts down the runtime, waiting for at most `duration` for all spawned
304     /// work to stop.
305     ///
306     /// Note that `spawn_blocking` tasks, and only `spawn_blocking` tasks, can get left behind if
307     /// the timeout expires.
308     ///
309     /// See the [struct level documentation](LocalRuntime#shutdown) for more details.
310     ///
311     /// # Examples
312     ///
313     /// ```
314     /// use tokio::runtime::LocalRuntime;
315     /// use tokio::task;
316     ///
317     /// use std::thread;
318     /// use std::time::Duration;
319     ///
320     /// fn main() {
321     ///    let runtime = LocalRuntime::new().unwrap();
322     ///
323     ///    runtime.block_on(async move {
324     ///        task::spawn_blocking(move || {
325     ///            thread::sleep(Duration::from_secs(10_000));
326     ///        });
327     ///    });
328     ///
329     ///    runtime.shutdown_timeout(Duration::from_millis(100));
330     /// }
331     /// ```
shutdown_timeout(mut self, duration: Duration)332     pub fn shutdown_timeout(mut self, duration: Duration) {
333         // Wakeup and shutdown all the worker threads
334         self.handle.inner.shutdown();
335         self.blocking_pool.shutdown(Some(duration));
336     }
337 
338     /// Shuts down the runtime, without waiting for any spawned work to stop.
339     ///
340     /// This can be useful if you want to drop a runtime from within another runtime.
341     /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks
342     /// to complete, which would normally not be permitted within an asynchronous context.
343     /// By calling `shutdown_background()`, you can drop the runtime from such a context.
344     ///
345     /// Note however, that because we do not wait for any blocking tasks to complete, this
346     /// may result in a resource leak (in that any blocking tasks are still running until they
347     /// return. No other tasks will leak.
348     ///
349     /// See the [struct level documentation](LocalRuntime#shutdown) for more details.
350     ///
351     /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`.
352     ///
353     /// ```
354     /// use tokio::runtime::LocalRuntime;
355     ///
356     /// fn main() {
357     ///    let runtime = LocalRuntime::new().unwrap();
358     ///
359     ///    runtime.block_on(async move {
360     ///        let inner_runtime = LocalRuntime::new().unwrap();
361     ///        // ...
362     ///        inner_runtime.shutdown_background();
363     ///    });
364     /// }
365     /// ```
shutdown_background(self)366     pub fn shutdown_background(self) {
367         self.shutdown_timeout(Duration::from_nanos(0));
368     }
369 
370     /// Returns a view that lets you get information about how the runtime
371     /// is performing.
metrics(&self) -> crate::runtime::RuntimeMetrics372     pub fn metrics(&self) -> crate::runtime::RuntimeMetrics {
373         self.handle.metrics()
374     }
375 }
376 
377 #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let
378 impl Drop for LocalRuntime {
drop(&mut self)379     fn drop(&mut self) {
380         if let LocalRuntimeScheduler::CurrentThread(current_thread) = &mut self.scheduler {
381             // This ensures that tasks spawned on the current-thread
382             // runtime are dropped inside the runtime's context.
383             let _guard = context::try_set_current(&self.handle.inner);
384             current_thread.shutdown(&self.handle.inner);
385         } else {
386             unreachable!("LocalRuntime only supports current-thread")
387         }
388     }
389 }
390 
391 impl std::panic::UnwindSafe for LocalRuntime {}
392 
393 impl std::panic::RefUnwindSafe for LocalRuntime {}
394