1 #![allow(irrefutable_let_patterns)] 2 3 use crate::runtime::blocking::BlockingPool; 4 use crate::runtime::scheduler::CurrentThread; 5 use crate::runtime::{context, Builder, EnterGuard, Handle, BOX_FUTURE_THRESHOLD}; 6 use crate::task::JoinHandle; 7 8 use crate::util::trace::SpawnMeta; 9 use std::future::Future; 10 use std::marker::PhantomData; 11 use std::mem; 12 use std::time::Duration; 13 14 /// A local Tokio runtime. 15 /// 16 /// This runtime is capable of driving tasks which are not `Send + Sync` without the use of a 17 /// `LocalSet`, and thus supports `spawn_local` without the need for a `LocalSet` context. 18 /// 19 /// This runtime cannot be moved between threads or driven from different threads. 20 /// 21 /// This runtime is incompatible with `LocalSet`. You should not attempt to drive a `LocalSet` within a 22 /// `LocalRuntime`. 23 /// 24 /// Currently, this runtime supports one flavor, which is internally identical to `current_thread`, 25 /// save for the aforementioned differences related to `spawn_local`. 26 /// 27 /// For more general information on how to use runtimes, see the [module] docs. 28 /// 29 /// [runtime]: crate::runtime::Runtime 30 /// [module]: crate::runtime 31 #[derive(Debug)] 32 #[cfg_attr(docsrs, doc(cfg(tokio_unstable)))] 33 pub struct LocalRuntime { 34 /// Task scheduler 35 scheduler: LocalRuntimeScheduler, 36 37 /// Handle to runtime, also contains driver handles 38 handle: Handle, 39 40 /// Blocking pool handle, used to signal shutdown 41 blocking_pool: BlockingPool, 42 43 /// Marker used to make this !Send and !Sync. 44 _phantom: PhantomData<*mut u8>, 45 } 46 47 /// The runtime scheduler is always a `current_thread` scheduler right now. 48 #[derive(Debug)] 49 pub(crate) enum LocalRuntimeScheduler { 50 /// Execute all tasks on the current-thread. 51 CurrentThread(CurrentThread), 52 } 53 54 impl LocalRuntime { from_parts( scheduler: LocalRuntimeScheduler, handle: Handle, blocking_pool: BlockingPool, ) -> LocalRuntime55 pub(crate) fn from_parts( 56 scheduler: LocalRuntimeScheduler, 57 handle: Handle, 58 blocking_pool: BlockingPool, 59 ) -> LocalRuntime { 60 LocalRuntime { 61 scheduler, 62 handle, 63 blocking_pool, 64 _phantom: Default::default(), 65 } 66 } 67 68 /// Creates a new local runtime instance with default configuration values. 69 /// 70 /// This results in the scheduler, I/O driver, and time driver being 71 /// initialized. 72 /// 73 /// When a more complex configuration is necessary, the [runtime builder] may be used. 74 /// 75 /// See [module level][mod] documentation for more details. 76 /// 77 /// # Examples 78 /// 79 /// Creating a new `LocalRuntime` with default configuration values. 80 /// 81 /// ``` 82 /// use tokio::runtime::LocalRuntime; 83 /// 84 /// let rt = LocalRuntime::new() 85 /// .unwrap(); 86 /// 87 /// // Use the runtime... 88 /// ``` 89 /// 90 /// [mod]: crate::runtime 91 /// [runtime builder]: crate::runtime::Builder new() -> std::io::Result<LocalRuntime>92 pub fn new() -> std::io::Result<LocalRuntime> { 93 Builder::new_current_thread() 94 .enable_all() 95 .build_local(&Default::default()) 96 } 97 98 /// Returns a handle to the runtime's spawner. 99 /// 100 /// The returned handle can be used to spawn tasks that run on this runtime, and can 101 /// be cloned to allow moving the `Handle` to other threads. 102 /// 103 /// As the handle can be sent to other threads, it can only be used to spawn tasks that are `Send`. 104 /// 105 /// Calling [`Handle::block_on`] on a handle to a `LocalRuntime` is error-prone. 106 /// Refer to the documentation of [`Handle::block_on`] for more. 107 /// 108 /// # Examples 109 /// 110 /// ``` 111 /// use tokio::runtime::LocalRuntime; 112 /// 113 /// let rt = LocalRuntime::new() 114 /// .unwrap(); 115 /// 116 /// let handle = rt.handle(); 117 /// 118 /// // Use the handle... 119 /// ``` handle(&self) -> &Handle120 pub fn handle(&self) -> &Handle { 121 &self.handle 122 } 123 124 /// Spawns a task on the runtime. 125 /// 126 /// This is analogous to the [`spawn`] method on the standard [`Runtime`], but works even if the task is not thread-safe. 127 /// 128 /// [`spawn`]: crate::runtime::Runtime::spawn 129 /// [`Runtime`]: crate::runtime::Runtime 130 /// 131 /// # Examples 132 /// 133 /// ``` 134 /// use tokio::runtime::LocalRuntime; 135 /// 136 /// # fn dox() { 137 /// // Create the runtime 138 /// let rt = LocalRuntime::new().unwrap(); 139 /// 140 /// // Spawn a future onto the runtime 141 /// rt.spawn_local(async { 142 /// println!("now running on a worker thread"); 143 /// }); 144 /// # } 145 /// ``` 146 #[track_caller] spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + 'static, F::Output: 'static,147 pub fn spawn_local<F>(&self, future: F) -> JoinHandle<F::Output> 148 where 149 F: Future + 'static, 150 F::Output: 'static, 151 { 152 let fut_size = std::mem::size_of::<F>(); 153 let meta = SpawnMeta::new_unnamed(fut_size); 154 155 // safety: spawn_local can only be called from `LocalRuntime`, which this is 156 unsafe { 157 if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD { 158 self.handle.spawn_local_named(Box::pin(future), meta) 159 } else { 160 self.handle.spawn_local_named(future, meta) 161 } 162 } 163 } 164 165 /// Runs the provided function on a thread from a dedicated blocking thread pool. 166 /// 167 /// This function _will_ be run on another thread. 168 /// 169 /// See the documentation in the non-local runtime for more information. 170 /// 171 /// [Runtime]: crate::runtime::Runtime::spawn_blocking 172 /// 173 /// # Examples 174 /// 175 /// ``` 176 /// use tokio::runtime::LocalRuntime; 177 /// 178 /// # fn dox() { 179 /// // Create the runtime 180 /// let rt = LocalRuntime::new().unwrap(); 181 /// 182 /// // Spawn a blocking function onto the runtime 183 /// rt.spawn_blocking(|| { 184 /// println!("now running on a worker thread"); 185 /// }); 186 /// # } 187 /// ``` 188 #[track_caller] spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,189 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> 190 where 191 F: FnOnce() -> R + Send + 'static, 192 R: Send + 'static, 193 { 194 self.handle.spawn_blocking(func) 195 } 196 197 /// Runs a future to completion on the Tokio runtime. This is the 198 /// runtime's entry point. 199 /// 200 /// See the documentation for [the equivalent method on Runtime] for more information. 201 /// 202 /// [Runtime]: crate::runtime::Runtime::block_on 203 /// 204 /// # Examples 205 /// 206 /// ```no_run 207 /// use tokio::runtime::LocalRuntime; 208 /// 209 /// // Create the runtime 210 /// let rt = LocalRuntime::new().unwrap(); 211 /// 212 /// // Execute the future, blocking the current thread until completion 213 /// rt.block_on(async { 214 /// println!("hello"); 215 /// }); 216 /// ``` 217 #[track_caller] block_on<F: Future>(&self, future: F) -> F::Output218 pub fn block_on<F: Future>(&self, future: F) -> F::Output { 219 let fut_size = mem::size_of::<F>(); 220 let meta = SpawnMeta::new_unnamed(fut_size); 221 222 if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD { 223 self.block_on_inner(Box::pin(future), meta) 224 } else { 225 self.block_on_inner(future, meta) 226 } 227 } 228 229 #[track_caller] block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output230 fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output { 231 #[cfg(all( 232 tokio_unstable, 233 tokio_taskdump, 234 feature = "rt", 235 target_os = "linux", 236 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") 237 ))] 238 let future = crate::runtime::task::trace::Trace::root(future); 239 240 #[cfg(all(tokio_unstable, feature = "tracing"))] 241 let future = crate::util::trace::task( 242 future, 243 "block_on", 244 _meta, 245 crate::runtime::task::Id::next().as_u64(), 246 ); 247 248 let _enter = self.enter(); 249 250 if let LocalRuntimeScheduler::CurrentThread(exec) = &self.scheduler { 251 exec.block_on(&self.handle.inner, future) 252 } else { 253 unreachable!("LocalRuntime only supports current_thread") 254 } 255 } 256 257 /// Enters the runtime context. 258 /// 259 /// This allows you to construct types that must have an executor 260 /// available on creation such as [`Sleep`] or [`TcpStream`]. It will 261 /// also allow you to call methods such as [`tokio::spawn`]. 262 /// 263 /// If this is a handle to a [`LocalRuntime`], and this function is being invoked from the same 264 /// thread that the runtime was created on, you will also be able to call 265 /// [`tokio::task::spawn_local`]. 266 /// 267 /// [`Sleep`]: struct@crate::time::Sleep 268 /// [`TcpStream`]: struct@crate::net::TcpStream 269 /// [`tokio::spawn`]: fn@crate::spawn 270 /// [`LocalRuntime`]: struct@crate::runtime::LocalRuntime 271 /// [`tokio::task::spawn_local`]: fn@crate::task::spawn_local 272 /// 273 /// # Example 274 /// 275 /// ``` 276 /// use tokio::runtime::LocalRuntime; 277 /// use tokio::task::JoinHandle; 278 /// 279 /// fn function_that_spawns(msg: String) -> JoinHandle<()> { 280 /// // Had we not used `rt.enter` below, this would panic. 281 /// tokio::spawn(async move { 282 /// println!("{}", msg); 283 /// }) 284 /// } 285 /// 286 /// fn main() { 287 /// let rt = LocalRuntime::new().unwrap(); 288 /// 289 /// let s = "Hello World!".to_string(); 290 /// 291 /// // By entering the context, we tie `tokio::spawn` to this executor. 292 /// let _guard = rt.enter(); 293 /// let handle = function_that_spawns(s); 294 /// 295 /// // Wait for the task before we end the test. 296 /// rt.block_on(handle).unwrap(); 297 /// } 298 /// ``` enter(&self) -> EnterGuard<'_>299 pub fn enter(&self) -> EnterGuard<'_> { 300 self.handle.enter() 301 } 302 303 /// Shuts down the runtime, waiting for at most `duration` for all spawned 304 /// work to stop. 305 /// 306 /// Note that `spawn_blocking` tasks, and only `spawn_blocking` tasks, can get left behind if 307 /// the timeout expires. 308 /// 309 /// See the [struct level documentation](LocalRuntime#shutdown) for more details. 310 /// 311 /// # Examples 312 /// 313 /// ``` 314 /// use tokio::runtime::LocalRuntime; 315 /// use tokio::task; 316 /// 317 /// use std::thread; 318 /// use std::time::Duration; 319 /// 320 /// fn main() { 321 /// let runtime = LocalRuntime::new().unwrap(); 322 /// 323 /// runtime.block_on(async move { 324 /// task::spawn_blocking(move || { 325 /// thread::sleep(Duration::from_secs(10_000)); 326 /// }); 327 /// }); 328 /// 329 /// runtime.shutdown_timeout(Duration::from_millis(100)); 330 /// } 331 /// ``` shutdown_timeout(mut self, duration: Duration)332 pub fn shutdown_timeout(mut self, duration: Duration) { 333 // Wakeup and shutdown all the worker threads 334 self.handle.inner.shutdown(); 335 self.blocking_pool.shutdown(Some(duration)); 336 } 337 338 /// Shuts down the runtime, without waiting for any spawned work to stop. 339 /// 340 /// This can be useful if you want to drop a runtime from within another runtime. 341 /// Normally, dropping a runtime will block indefinitely for spawned blocking tasks 342 /// to complete, which would normally not be permitted within an asynchronous context. 343 /// By calling `shutdown_background()`, you can drop the runtime from such a context. 344 /// 345 /// Note however, that because we do not wait for any blocking tasks to complete, this 346 /// may result in a resource leak (in that any blocking tasks are still running until they 347 /// return. No other tasks will leak. 348 /// 349 /// See the [struct level documentation](LocalRuntime#shutdown) for more details. 350 /// 351 /// This function is equivalent to calling `shutdown_timeout(Duration::from_nanos(0))`. 352 /// 353 /// ``` 354 /// use tokio::runtime::LocalRuntime; 355 /// 356 /// fn main() { 357 /// let runtime = LocalRuntime::new().unwrap(); 358 /// 359 /// runtime.block_on(async move { 360 /// let inner_runtime = LocalRuntime::new().unwrap(); 361 /// // ... 362 /// inner_runtime.shutdown_background(); 363 /// }); 364 /// } 365 /// ``` shutdown_background(self)366 pub fn shutdown_background(self) { 367 self.shutdown_timeout(Duration::from_nanos(0)); 368 } 369 370 /// Returns a view that lets you get information about how the runtime 371 /// is performing. metrics(&self) -> crate::runtime::RuntimeMetrics372 pub fn metrics(&self) -> crate::runtime::RuntimeMetrics { 373 self.handle.metrics() 374 } 375 } 376 377 #[allow(clippy::single_match)] // there are comments in the error branch, so we don't want if-let 378 impl Drop for LocalRuntime { drop(&mut self)379 fn drop(&mut self) { 380 if let LocalRuntimeScheduler::CurrentThread(current_thread) = &mut self.scheduler { 381 // This ensures that tasks spawned on the current-thread 382 // runtime are dropped inside the runtime's context. 383 let _guard = context::try_set_current(&self.handle.inner); 384 current_thread.shutdown(&self.handle.inner); 385 } else { 386 unreachable!("LocalRuntime only supports current-thread") 387 } 388 } 389 } 390 391 impl std::panic::UnwindSafe for LocalRuntime {} 392 393 impl std::panic::RefUnwindSafe for LocalRuntime {} 394