1 use crate::runtime::blocking::{BlockingTask, NoopSchedule}; 2 use crate::runtime::task::{self, JoinHandle}; 3 use crate::runtime::{blocking, context, driver, Spawner}; 4 use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR}; 5 6 use std::future::Future; 7 use std::marker::PhantomData; 8 use std::{error, fmt}; 9 10 /// Handle to the runtime. 11 /// 12 /// The handle is internally reference-counted and can be freely cloned. A handle can be 13 /// obtained using the [`Runtime::handle`] method. 14 /// 15 /// [`Runtime::handle`]: crate::runtime::Runtime::handle() 16 #[derive(Debug, Clone)] 17 pub struct Handle { 18 pub(super) spawner: Spawner, 19 20 /// Handles to the I/O drivers 21 #[cfg_attr( 22 not(any(feature = "net", feature = "process", all(unix, feature = "signal"))), 23 allow(dead_code) 24 )] 25 pub(super) io_handle: driver::IoHandle, 26 27 /// Handles to the signal drivers 28 #[cfg_attr( 29 not(any(feature = "signal", all(unix, feature = "process"))), 30 allow(dead_code) 31 )] 32 pub(super) signal_handle: driver::SignalHandle, 33 34 /// Handles to the time drivers 35 #[cfg_attr(not(feature = "time"), allow(dead_code))] 36 pub(super) time_handle: driver::TimeHandle, 37 38 /// Source of `Instant::now()` 39 #[cfg_attr(not(all(feature = "time", feature = "test-util")), allow(dead_code))] 40 pub(super) clock: driver::Clock, 41 42 /// Blocking pool spawner 43 pub(super) blocking_spawner: blocking::Spawner, 44 } 45 46 /// Runtime context guard. 47 /// 48 /// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits 49 /// the runtime context on drop. 50 /// 51 /// [`Runtime::enter`]: fn@crate::runtime::Runtime::enter 52 #[derive(Debug)] 53 #[must_use = "Creating and dropping a guard does nothing"] 54 pub struct EnterGuard<'a> { 55 _guard: context::EnterGuard, 56 _handle_lifetime: PhantomData<&'a Handle>, 57 } 58 59 impl Handle { 60 /// Enters the runtime context. This allows you to construct types that must 61 /// have an executor available on creation such as [`Sleep`] or [`TcpStream`]. 62 /// It will also allow you to call methods such as [`tokio::spawn`]. 63 /// 64 /// [`Sleep`]: struct@crate::time::Sleep 65 /// [`TcpStream`]: struct@crate::net::TcpStream 66 /// [`tokio::spawn`]: fn@crate::spawn enter(&self) -> EnterGuard<'_>67 pub fn enter(&self) -> EnterGuard<'_> { 68 EnterGuard { 69 _guard: context::enter(self.clone()), 70 _handle_lifetime: PhantomData, 71 } 72 } 73 74 /// Returns a `Handle` view over the currently running `Runtime`. 75 /// 76 /// # Panic 77 /// 78 /// This will panic if called outside the context of a Tokio runtime. That means that you must 79 /// call this on one of the threads **being run by the runtime**. Calling this from within a 80 /// thread created by `std::thread::spawn` (for example) will cause a panic. 81 /// 82 /// # Examples 83 /// 84 /// This can be used to obtain the handle of the surrounding runtime from an async 85 /// block or function running on that runtime. 86 /// 87 /// ``` 88 /// # use std::thread; 89 /// # use tokio::runtime::Runtime; 90 /// # fn dox() { 91 /// # let rt = Runtime::new().unwrap(); 92 /// # rt.spawn(async { 93 /// use tokio::runtime::Handle; 94 /// 95 /// // Inside an async block or function. 96 /// let handle = Handle::current(); 97 /// handle.spawn(async { 98 /// println!("now running in the existing Runtime"); 99 /// }); 100 /// 101 /// # let handle = 102 /// thread::spawn(move || { 103 /// // Notice that the handle is created outside of this thread and then moved in 104 /// handle.spawn(async { /* ... */ }) 105 /// // This next line would cause a panic 106 /// // let handle2 = Handle::current(); 107 /// }); 108 /// # handle.join().unwrap(); 109 /// # }); 110 /// # } 111 /// ``` current() -> Self112 pub fn current() -> Self { 113 context::current() 114 } 115 116 /// Returns a Handle view over the currently running Runtime 117 /// 118 /// Returns an error if no Runtime has been started 119 /// 120 /// Contrary to `current`, this never panics try_current() -> Result<Self, TryCurrentError>121 pub fn try_current() -> Result<Self, TryCurrentError> { 122 context::try_current() 123 } 124 125 cfg_stats! { 126 /// Returns a view that lets you get information about how the runtime 127 /// is performing. 128 pub fn stats(&self) -> &crate::runtime::stats::RuntimeStats { 129 self.spawner.stats() 130 } 131 } 132 133 /// Spawns a future onto the Tokio runtime. 134 /// 135 /// This spawns the given future onto the runtime's executor, usually a 136 /// thread pool. The thread pool is then responsible for polling the future 137 /// until it completes. 138 /// 139 /// See [module level][mod] documentation for more details. 140 /// 141 /// [mod]: index.html 142 /// 143 /// # Examples 144 /// 145 /// ``` 146 /// use tokio::runtime::Runtime; 147 /// 148 /// # fn dox() { 149 /// // Create the runtime 150 /// let rt = Runtime::new().unwrap(); 151 /// // Get a handle from this runtime 152 /// let handle = rt.handle(); 153 /// 154 /// // Spawn a future onto the runtime using the handle 155 /// handle.spawn(async { 156 /// println!("now running on a worker thread"); 157 /// }); 158 /// # } 159 /// ``` 160 #[cfg_attr(tokio_track_caller, track_caller)] spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,161 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output> 162 where 163 F: Future + Send + 'static, 164 F::Output: Send + 'static, 165 { 166 #[cfg(all(tokio_unstable, feature = "tracing"))] 167 let future = crate::util::trace::task(future, "task", None); 168 self.spawner.spawn(future) 169 } 170 171 /// Runs the provided function on an executor dedicated to blocking. 172 /// operations. 173 /// 174 /// # Examples 175 /// 176 /// ``` 177 /// use tokio::runtime::Runtime; 178 /// 179 /// # fn dox() { 180 /// // Create the runtime 181 /// let rt = Runtime::new().unwrap(); 182 /// // Get a handle from this runtime 183 /// let handle = rt.handle(); 184 /// 185 /// // Spawn a blocking function onto the runtime using the handle 186 /// handle.spawn_blocking(|| { 187 /// println!("now running on a worker thread"); 188 /// }); 189 /// # } 190 #[cfg_attr(tokio_track_caller, track_caller)] spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,191 pub fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R> 192 where 193 F: FnOnce() -> R + Send + 'static, 194 R: Send + 'static, 195 { 196 if cfg!(debug_assertions) && std::mem::size_of::<F>() > 2048 { 197 self.spawn_blocking_inner(Box::new(func), None) 198 } else { 199 self.spawn_blocking_inner(func, None) 200 } 201 } 202 203 #[cfg_attr(tokio_track_caller, track_caller)] spawn_blocking_inner<F, R>(&self, func: F, name: Option<&str>) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,204 pub(crate) fn spawn_blocking_inner<F, R>(&self, func: F, name: Option<&str>) -> JoinHandle<R> 205 where 206 F: FnOnce() -> R + Send + 'static, 207 R: Send + 'static, 208 { 209 let fut = BlockingTask::new(func); 210 211 #[cfg(all(tokio_unstable, feature = "tracing"))] 212 let fut = { 213 use tracing::Instrument; 214 #[cfg(tokio_track_caller)] 215 let location = std::panic::Location::caller(); 216 #[cfg(tokio_track_caller)] 217 let span = tracing::trace_span!( 218 target: "tokio::task::blocking", 219 "runtime.spawn", 220 kind = %"blocking", 221 task.name = %name.unwrap_or_default(), 222 "fn" = %std::any::type_name::<F>(), 223 spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), 224 ); 225 #[cfg(not(tokio_track_caller))] 226 let span = tracing::trace_span!( 227 target: "tokio::task::blocking", 228 "runtime.spawn", 229 kind = %"blocking", 230 task.name = %name.unwrap_or_default(), 231 "fn" = %std::any::type_name::<F>(), 232 ); 233 fut.instrument(span) 234 }; 235 236 #[cfg(not(all(tokio_unstable, feature = "tracing")))] 237 let _ = name; 238 239 let (task, handle) = task::unowned(fut, NoopSchedule); 240 let _ = self.blocking_spawner.spawn(task, self); 241 handle 242 } 243 244 /// Runs a future to completion on this `Handle`'s associated `Runtime`. 245 /// 246 /// This runs the given future on the current thread, blocking until it is 247 /// complete, and yielding its resolved result. Any tasks or timers which 248 /// the future spawns internally will be executed on the runtime. 249 /// 250 /// When this is used on a `current_thread` runtime, only the 251 /// [`Runtime::block_on`] method can drive the IO and timer drivers, but the 252 /// `Handle::block_on` method cannot drive them. This means that, when using 253 /// this method on a current_thread runtime, anything that relies on IO or 254 /// timers will not work unless there is another thread currently calling 255 /// [`Runtime::block_on`] on the same runtime. 256 /// 257 /// # If the runtime has been shut down 258 /// 259 /// If the `Handle`'s associated `Runtime` has been shut down (through 260 /// [`Runtime::shutdown_background`], [`Runtime::shutdown_timeout`], or by 261 /// dropping it) and `Handle::block_on` is used it might return an error or 262 /// panic. Specifically IO resources will return an error and timers will 263 /// panic. Runtime independent futures will run as normal. 264 /// 265 /// # Panics 266 /// 267 /// This function panics if the provided future panics, if called within an 268 /// asynchronous execution context, or if a timer future is executed on a 269 /// runtime that has been shut down. 270 /// 271 /// # Examples 272 /// 273 /// ``` 274 /// use tokio::runtime::Runtime; 275 /// 276 /// // Create the runtime 277 /// let rt = Runtime::new().unwrap(); 278 /// 279 /// // Get a handle from this runtime 280 /// let handle = rt.handle(); 281 /// 282 /// // Execute the future, blocking the current thread until completion 283 /// handle.block_on(async { 284 /// println!("hello"); 285 /// }); 286 /// ``` 287 /// 288 /// Or using `Handle::current`: 289 /// 290 /// ``` 291 /// use tokio::runtime::Handle; 292 /// 293 /// #[tokio::main] 294 /// async fn main () { 295 /// let handle = Handle::current(); 296 /// std::thread::spawn(move || { 297 /// // Using Handle::block_on to run async code in the new thread. 298 /// handle.block_on(async { 299 /// println!("hello"); 300 /// }); 301 /// }); 302 /// } 303 /// ``` 304 /// 305 /// [`JoinError`]: struct@crate::task::JoinError 306 /// [`JoinHandle`]: struct@crate::task::JoinHandle 307 /// [`Runtime::block_on`]: fn@crate::runtime::Runtime::block_on 308 /// [`Runtime::shutdown_background`]: fn@crate::runtime::Runtime::shutdown_background 309 /// [`Runtime::shutdown_timeout`]: fn@crate::runtime::Runtime::shutdown_timeout 310 /// [`spawn_blocking`]: crate::task::spawn_blocking 311 /// [`tokio::fs`]: crate::fs 312 /// [`tokio::net`]: crate::net 313 /// [`tokio::time`]: crate::time 314 #[cfg_attr(tokio_track_caller, track_caller)] block_on<F: Future>(&self, future: F) -> F::Output315 pub fn block_on<F: Future>(&self, future: F) -> F::Output { 316 #[cfg(all(tokio_unstable, feature = "tracing"))] 317 let future = crate::util::trace::task(future, "block_on", None); 318 319 // Enter the **runtime** context. This configures spawning, the current I/O driver, ... 320 let _rt_enter = self.enter(); 321 322 // Enter a **blocking** context. This prevents blocking from a runtime. 323 let mut blocking_enter = crate::runtime::enter(true); 324 325 // Block on the future 326 blocking_enter 327 .block_on(future) 328 .expect("failed to park thread") 329 } 330 shutdown(mut self)331 pub(crate) fn shutdown(mut self) { 332 self.spawner.shutdown(); 333 } 334 } 335 336 /// Error returned by `try_current` when no Runtime has been started 337 #[derive(Debug)] 338 pub struct TryCurrentError { 339 kind: TryCurrentErrorKind, 340 } 341 342 impl TryCurrentError { new_no_context() -> Self343 pub(crate) fn new_no_context() -> Self { 344 Self { 345 kind: TryCurrentErrorKind::NoContext, 346 } 347 } 348 new_thread_local_destroyed() -> Self349 pub(crate) fn new_thread_local_destroyed() -> Self { 350 Self { 351 kind: TryCurrentErrorKind::ThreadLocalDestroyed, 352 } 353 } 354 355 /// Returns true if the call failed because there is currently no runtime in 356 /// the Tokio context. is_missing_context(&self) -> bool357 pub fn is_missing_context(&self) -> bool { 358 matches!(self.kind, TryCurrentErrorKind::NoContext) 359 } 360 361 /// Returns true if the call failed because the Tokio context thread-local 362 /// had been destroyed. This can usually only happen if in the destructor of 363 /// other thread-locals. is_thread_local_destroyed(&self) -> bool364 pub fn is_thread_local_destroyed(&self) -> bool { 365 matches!(self.kind, TryCurrentErrorKind::ThreadLocalDestroyed) 366 } 367 } 368 369 enum TryCurrentErrorKind { 370 NoContext, 371 ThreadLocalDestroyed, 372 } 373 374 impl fmt::Debug for TryCurrentErrorKind { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result375 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 376 use TryCurrentErrorKind::*; 377 match self { 378 NoContext => f.write_str("NoContext"), 379 ThreadLocalDestroyed => f.write_str("ThreadLocalDestroyed"), 380 } 381 } 382 } 383 384 impl fmt::Display for TryCurrentError { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result385 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 386 use TryCurrentErrorKind::*; 387 match self.kind { 388 NoContext => f.write_str(CONTEXT_MISSING_ERROR), 389 ThreadLocalDestroyed => f.write_str(THREAD_LOCAL_DESTROYED_ERROR), 390 } 391 } 392 } 393 394 impl error::Error for TryCurrentError {} 395