use crate::loom::thread::AccessError; use crate::runtime::coop; use std::cell::Cell; #[cfg(any(feature = "rt", feature = "macros"))] use crate::util::rand::{FastRand, RngSeed}; cfg_rt! { use crate::runtime::{scheduler, task::Id, Defer}; use std::cell::RefCell; use std::marker::PhantomData; use std::time::Duration; } struct Context { /// Uniquely identifies the current thread #[cfg(feature = "rt")] thread_id: Cell>, /// Handle to the runtime scheduler running on the current thread. #[cfg(feature = "rt")] handle: RefCell>, #[cfg(feature = "rt")] current_task_id: Cell>, /// Tracks if the current thread is currently driving a runtime. /// Note, that if this is set to "entered", the current scheduler /// handle may not reference the runtime currently executing. This /// is because other runtime handles may be set to current from /// within a runtime. #[cfg(feature = "rt")] runtime: Cell, /// Yielded task wakers are stored here and notified after resource drivers /// are polled. #[cfg(feature = "rt")] defer: RefCell>, #[cfg(any(feature = "rt", feature = "macros"))] rng: FastRand, /// Tracks the amount of "work" a task may still do before yielding back to /// the sheduler budget: Cell, } tokio_thread_local! { static CONTEXT: Context = { Context { #[cfg(feature = "rt")] thread_id: Cell::new(None), /// Tracks the current runtime handle to use when spawning, /// accessing drivers, etc... #[cfg(feature = "rt")] handle: RefCell::new(None), #[cfg(feature = "rt")] current_task_id: Cell::new(None), /// Tracks if the current thread is currently driving a runtime. /// Note, that if this is set to "entered", the current scheduler /// handle may not reference the runtime currently executing. This /// is because other runtime handles may be set to current from /// within a runtime. #[cfg(feature = "rt")] runtime: Cell::new(EnterRuntime::NotEntered), #[cfg(feature = "rt")] defer: RefCell::new(None), #[cfg(any(feature = "rt", feature = "macros"))] rng: FastRand::new(RngSeed::new()), budget: Cell::new(coop::Budget::unconstrained()), } } } #[cfg(feature = "macros")] pub(crate) fn thread_rng_n(n: u32) -> u32 { CONTEXT.with(|ctx| ctx.rng.fastrand_n(n)) } pub(super) fn budget(f: impl FnOnce(&Cell) -> R) -> Result { CONTEXT.try_with(|ctx| f(&ctx.budget)) } cfg_rt! { use crate::runtime::{ThreadId, TryCurrentError}; use std::fmt; pub(crate) fn thread_id() -> Result { CONTEXT.try_with(|ctx| { match ctx.thread_id.get() { Some(id) => id, None => { let id = ThreadId::next(); ctx.thread_id.set(Some(id)); id } } }) } #[derive(Debug, Clone, Copy)] #[must_use] pub(crate) enum EnterRuntime { /// Currently in a runtime context. #[cfg_attr(not(feature = "rt"), allow(dead_code))] Entered { allow_block_in_place: bool }, /// Not in a runtime context **or** a blocking region. NotEntered, } #[derive(Debug)] #[must_use] pub(crate) struct SetCurrentGuard { old_handle: Option, old_seed: RngSeed, } /// Guard tracking that a caller has entered a runtime context. #[must_use] pub(crate) struct EnterRuntimeGuard { /// Tracks that the current thread has entered a blocking function call. pub(crate) blocking: BlockingRegionGuard, #[allow(dead_code)] // Only tracking the guard. pub(crate) handle: SetCurrentGuard, /// If true, then this is the root runtime guard. It is possible to nest /// runtime guards by using `block_in_place` between the calls. We need /// to track the root guard as this is the guard responsible for freeing /// the deferred task queue. is_root: bool, } /// Guard tracking that a caller has entered a blocking region. #[must_use] pub(crate) struct BlockingRegionGuard { _p: PhantomData>, } pub(crate) struct DisallowBlockInPlaceGuard(bool); pub(crate) fn set_current_task_id(id: Option) -> Option { CONTEXT.try_with(|ctx| ctx.current_task_id.replace(id)).unwrap_or(None) } pub(crate) fn current_task_id() -> Option { CONTEXT.try_with(|ctx| ctx.current_task_id.get()).unwrap_or(None) } pub(crate) fn try_current() -> Result { match CONTEXT.try_with(|ctx| ctx.handle.borrow().clone()) { Ok(Some(handle)) => Ok(handle), Ok(None) => Err(TryCurrentError::new_no_context()), Err(_access_error) => Err(TryCurrentError::new_thread_local_destroyed()), } } /// Sets this [`Handle`] as the current active [`Handle`]. /// /// [`Handle`]: crate::runtime::scheduler::Handle pub(crate) fn try_set_current(handle: &scheduler::Handle) -> Option { CONTEXT.try_with(|ctx| ctx.set_current(handle)).ok() } /// Marks the current thread as being within the dynamic extent of an /// executor. #[track_caller] pub(crate) fn enter_runtime(handle: &scheduler::Handle, allow_block_in_place: bool) -> EnterRuntimeGuard { if let Some(enter) = try_enter_runtime(handle, allow_block_in_place) { return enter; } panic!( "Cannot start a runtime from within a runtime. This happens \ because a function (like `block_on`) attempted to block the \ current thread while the thread is being used to drive \ asynchronous tasks." ); } /// Tries to enter a runtime context, returns `None` if already in a runtime /// context. fn try_enter_runtime(handle: &scheduler::Handle, allow_block_in_place: bool) -> Option { CONTEXT.with(|c| { if c.runtime.get().is_entered() { None } else { // Set the entered flag c.runtime.set(EnterRuntime::Entered { allow_block_in_place }); // Initialize queue to track yielded tasks let mut defer = c.defer.borrow_mut(); let is_root = if defer.is_none() { *defer = Some(Defer::new()); true } else { false }; Some(EnterRuntimeGuard { blocking: BlockingRegionGuard::new(), handle: c.set_current(handle), is_root, }) } }) } pub(crate) fn try_enter_blocking_region() -> Option { CONTEXT.try_with(|c| { if c.runtime.get().is_entered() { None } else { Some(BlockingRegionGuard::new()) } // If accessing the thread-local fails, the thread is terminating // and thread-locals are being destroyed. Because we don't know if // we are currently in a runtime or not, we default to being // permissive. }).unwrap_or_else(|_| Some(BlockingRegionGuard::new())) } /// Disallows blocking in the current runtime context until the guard is dropped. pub(crate) fn disallow_block_in_place() -> DisallowBlockInPlaceGuard { let reset = CONTEXT.with(|c| { if let EnterRuntime::Entered { allow_block_in_place: true, } = c.runtime.get() { c.runtime.set(EnterRuntime::Entered { allow_block_in_place: false, }); true } else { false } }); DisallowBlockInPlaceGuard(reset) } pub(crate) fn with_defer(f: impl FnOnce(&mut Defer) -> R) -> Option { CONTEXT.with(|c| { let mut defer = c.defer.borrow_mut(); defer.as_mut().map(f) }) } impl Context { fn set_current(&self, handle: &scheduler::Handle) -> SetCurrentGuard { let rng_seed = handle.seed_generator().next_seed(); let old_handle = self.handle.borrow_mut().replace(handle.clone()); let old_seed = self.rng.replace_seed(rng_seed); SetCurrentGuard { old_handle, old_seed, } } } impl Drop for SetCurrentGuard { fn drop(&mut self) { CONTEXT.with(|ctx| { *ctx.handle.borrow_mut() = self.old_handle.take(); ctx.rng.replace_seed(self.old_seed.clone()); }); } } impl fmt::Debug for EnterRuntimeGuard { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Enter").finish() } } impl Drop for EnterRuntimeGuard { fn drop(&mut self) { CONTEXT.with(|c| { assert!(c.runtime.get().is_entered()); c.runtime.set(EnterRuntime::NotEntered); if self.is_root { *c.defer.borrow_mut() = None; } }); } } impl BlockingRegionGuard { fn new() -> BlockingRegionGuard { BlockingRegionGuard { _p: PhantomData } } /// Blocks the thread on the specified future, returning the value with /// which that future completes. pub(crate) fn block_on(&mut self, f: F) -> Result where F: std::future::Future, { use crate::runtime::park::CachedParkThread; let mut park = CachedParkThread::new(); park.block_on(f) } /// Blocks the thread on the specified future for **at most** `timeout` /// /// If the future completes before `timeout`, the result is returned. If /// `timeout` elapses, then `Err` is returned. pub(crate) fn block_on_timeout(&mut self, f: F, timeout: Duration) -> Result where F: std::future::Future, { use crate::runtime::park::CachedParkThread; use std::task::Context; use std::task::Poll::Ready; use std::time::Instant; let mut park = CachedParkThread::new(); let waker = park.waker().map_err(|_| ())?; let mut cx = Context::from_waker(&waker); pin!(f); let when = Instant::now() + timeout; loop { if let Ready(v) = crate::runtime::coop::budget(|| f.as_mut().poll(&mut cx)) { return Ok(v); } let now = Instant::now(); if now >= when { return Err(()); } // Wake any yielded tasks before parking in order to avoid // blocking. with_defer(|defer| defer.wake()); park.park_timeout(when - now); } } } impl Drop for DisallowBlockInPlaceGuard { fn drop(&mut self) { if self.0 { // XXX: Do we want some kind of assertion here, or is "best effort" okay? CONTEXT.with(|c| { if let EnterRuntime::Entered { allow_block_in_place: false, } = c.runtime.get() { c.runtime.set(EnterRuntime::Entered { allow_block_in_place: true, }); } }) } } } impl EnterRuntime { pub(crate) fn is_entered(self) -> bool { matches!(self, EnterRuntime::Entered { .. }) } } } // Forces the current "entered" state to be cleared while the closure // is executed. // // # Warning // // This is hidden for a reason. Do not use without fully understanding // executors. Misusing can easily cause your program to deadlock. cfg_rt_multi_thread! { /// Returns true if in a runtime context. pub(crate) fn current_enter_context() -> EnterRuntime { CONTEXT.with(|c| c.runtime.get()) } pub(crate) fn exit_runtime R, R>(f: F) -> R { // Reset in case the closure panics struct Reset(EnterRuntime); impl Drop for Reset { fn drop(&mut self) { CONTEXT.with(|c| { assert!(!c.runtime.get().is_entered(), "closure claimed permanent executor"); c.runtime.set(self.0); }); } } let was = CONTEXT.with(|c| { let e = c.runtime.get(); assert!(e.is_entered(), "asked to exit when not entered"); c.runtime.set(EnterRuntime::NotEntered); e }); let _reset = Reset(was); // dropping _reset after f() will reset ENTERED f() } }