• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::future::poll_fn;
2 use crate::loom::sync::atomic::AtomicBool;
3 use crate::loom::sync::Arc;
4 use crate::runtime::driver::{self, Driver};
5 use crate::runtime::scheduler::{self, Defer, Inject};
6 use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
7 use crate::runtime::{blocking, context, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics};
8 use crate::sync::notify::Notify;
9 use crate::util::atomic_cell::AtomicCell;
10 use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
11 
12 use std::cell::RefCell;
13 use std::collections::VecDeque;
14 use std::fmt;
15 use std::future::Future;
16 use std::sync::atomic::Ordering::{AcqRel, Release};
17 use std::task::Poll::{Pending, Ready};
18 use std::task::Waker;
19 use std::time::Duration;
20 
21 /// Executes tasks on the current thread
22 pub(crate) struct CurrentThread {
23     /// Core scheduler data is acquired by a thread entering `block_on`.
24     core: AtomicCell<Core>,
25 
26     /// Notifier for waking up other threads to steal the
27     /// driver.
28     notify: Notify,
29 }
30 
31 /// Handle to the current thread scheduler
32 pub(crate) struct Handle {
33     /// Scheduler state shared across threads
34     shared: Shared,
35 
36     /// Resource driver handles
37     pub(crate) driver: driver::Handle,
38 
39     /// Blocking pool spawner
40     pub(crate) blocking_spawner: blocking::Spawner,
41 
42     /// Current random number generator seed
43     pub(crate) seed_generator: RngSeedGenerator,
44 }
45 
46 /// Data required for executing the scheduler. The struct is passed around to
47 /// a function that will perform the scheduling work and acts as a capability token.
48 struct Core {
49     /// Scheduler run queue
50     tasks: VecDeque<Notified>,
51 
52     /// Current tick
53     tick: u32,
54 
55     /// Runtime driver
56     ///
57     /// The driver is removed before starting to park the thread
58     driver: Option<Driver>,
59 
60     /// Metrics batch
61     metrics: MetricsBatch,
62 
63     /// How often to check the global queue
64     global_queue_interval: u32,
65 
66     /// True if a task panicked without being handled and the runtime is
67     /// configured to shutdown on unhandled panic.
68     unhandled_panic: bool,
69 }
70 
71 /// Scheduler state shared between threads.
72 struct Shared {
73     /// Remote run queue
74     inject: Inject<Arc<Handle>>,
75 
76     /// Collection of all active tasks spawned onto this executor.
77     owned: OwnedTasks<Arc<Handle>>,
78 
79     /// Indicates whether the blocked on thread was woken.
80     woken: AtomicBool,
81 
82     /// Scheduler configuration options
83     config: Config,
84 
85     /// Keeps track of various runtime metrics.
86     scheduler_metrics: SchedulerMetrics,
87 
88     /// This scheduler only has one worker.
89     worker_metrics: WorkerMetrics,
90 }
91 
92 /// Thread-local context.
93 ///
94 /// pub(crate) to store in `runtime::context`.
95 pub(crate) struct Context {
96     /// Scheduler handle
97     handle: Arc<Handle>,
98 
99     /// Scheduler core, enabling the holder of `Context` to execute the
100     /// scheduler.
101     core: RefCell<Option<Box<Core>>>,
102 
103     /// Deferred tasks, usually ones that called `task::yield_now()`.
104     pub(crate) defer: Defer,
105 }
106 
107 type Notified = task::Notified<Arc<Handle>>;
108 
109 /// Initial queue capacity.
110 const INITIAL_CAPACITY: usize = 64;
111 
112 /// Used if none is specified. This is a temporary constant and will be removed
113 /// as we unify tuning logic between the multi-thread and current-thread
114 /// schedulers.
115 const DEFAULT_GLOBAL_QUEUE_INTERVAL: u32 = 31;
116 
117 impl CurrentThread {
new( driver: Driver, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config, ) -> (CurrentThread, Arc<Handle>)118     pub(crate) fn new(
119         driver: Driver,
120         driver_handle: driver::Handle,
121         blocking_spawner: blocking::Spawner,
122         seed_generator: RngSeedGenerator,
123         config: Config,
124     ) -> (CurrentThread, Arc<Handle>) {
125         let worker_metrics = WorkerMetrics::from_config(&config);
126 
127         // Get the configured global queue interval, or use the default.
128         let global_queue_interval = config
129             .global_queue_interval
130             .unwrap_or(DEFAULT_GLOBAL_QUEUE_INTERVAL);
131 
132         let handle = Arc::new(Handle {
133             shared: Shared {
134                 inject: Inject::new(),
135                 owned: OwnedTasks::new(),
136                 woken: AtomicBool::new(false),
137                 config,
138                 scheduler_metrics: SchedulerMetrics::new(),
139                 worker_metrics,
140             },
141             driver: driver_handle,
142             blocking_spawner,
143             seed_generator,
144         });
145 
146         let core = AtomicCell::new(Some(Box::new(Core {
147             tasks: VecDeque::with_capacity(INITIAL_CAPACITY),
148             tick: 0,
149             driver: Some(driver),
150             metrics: MetricsBatch::new(&handle.shared.worker_metrics),
151             global_queue_interval,
152             unhandled_panic: false,
153         })));
154 
155         let scheduler = CurrentThread {
156             core,
157             notify: Notify::new(),
158         };
159 
160         (scheduler, handle)
161     }
162 
163     #[track_caller]
block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output164     pub(crate) fn block_on<F: Future>(&self, handle: &scheduler::Handle, future: F) -> F::Output {
165         pin!(future);
166 
167         crate::runtime::context::enter_runtime(handle, false, |blocking| {
168             let handle = handle.as_current_thread();
169 
170             // Attempt to steal the scheduler core and block_on the future if we can
171             // there, otherwise, lets select on a notification that the core is
172             // available or the future is complete.
173             loop {
174                 if let Some(core) = self.take_core(handle) {
175                     return core.block_on(future);
176                 } else {
177                     let notified = self.notify.notified();
178                     pin!(notified);
179 
180                     if let Some(out) = blocking
181                         .block_on(poll_fn(|cx| {
182                             if notified.as_mut().poll(cx).is_ready() {
183                                 return Ready(None);
184                             }
185 
186                             if let Ready(out) = future.as_mut().poll(cx) {
187                                 return Ready(Some(out));
188                             }
189 
190                             Pending
191                         }))
192                         .expect("Failed to `Enter::block_on`")
193                     {
194                         return out;
195                     }
196                 }
197             }
198         })
199     }
200 
take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>>201     fn take_core(&self, handle: &Arc<Handle>) -> Option<CoreGuard<'_>> {
202         let core = self.core.take()?;
203 
204         Some(CoreGuard {
205             context: scheduler::Context::CurrentThread(Context {
206                 handle: handle.clone(),
207                 core: RefCell::new(Some(core)),
208                 defer: Defer::new(),
209             }),
210             scheduler: self,
211         })
212     }
213 
shutdown(&mut self, handle: &scheduler::Handle)214     pub(crate) fn shutdown(&mut self, handle: &scheduler::Handle) {
215         let handle = handle.as_current_thread();
216 
217         // Avoid a double panic if we are currently panicking and
218         // the lock may be poisoned.
219 
220         let core = match self.take_core(handle) {
221             Some(core) => core,
222             None if std::thread::panicking() => return,
223             None => panic!("Oh no! We never placed the Core back, this is a bug!"),
224         };
225 
226         // Check that the thread-local is not being destroyed
227         let tls_available = context::with_current(|_| ()).is_ok();
228 
229         if tls_available {
230             core.enter(|core, _context| {
231                 let core = shutdown2(core, handle);
232                 (core, ())
233             });
234         } else {
235             // Shutdown without setting the context. `tokio::spawn` calls will
236             // fail, but those will fail either way because the thread-local is
237             // not available anymore.
238             let context = core.context.expect_current_thread();
239             let core = context.core.borrow_mut().take().unwrap();
240 
241             let core = shutdown2(core, handle);
242             *context.core.borrow_mut() = Some(core);
243         }
244     }
245 }
246 
shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core>247 fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
248     // Drain the OwnedTasks collection. This call also closes the
249     // collection, ensuring that no tasks are ever pushed after this
250     // call returns.
251     handle.shared.owned.close_and_shutdown_all();
252 
253     // Drain local queue
254     // We already shut down every task, so we just need to drop the task.
255     while let Some(task) = core.next_local_task(handle) {
256         drop(task);
257     }
258 
259     // Close the injection queue
260     handle.shared.inject.close();
261 
262     // Drain remote queue
263     while let Some(task) = handle.shared.inject.pop() {
264         drop(task);
265     }
266 
267     assert!(handle.shared.owned.is_empty());
268 
269     // Submit metrics
270     core.submit_metrics(handle);
271 
272     // Shutdown the resource drivers
273     if let Some(driver) = core.driver.as_mut() {
274         driver.shutdown(&handle.driver);
275     }
276 
277     core
278 }
279 
280 impl fmt::Debug for CurrentThread {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result281     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
282         fmt.debug_struct("CurrentThread").finish()
283     }
284 }
285 
286 // ===== impl Core =====
287 
288 impl Core {
289     /// Get and increment the current tick
tick(&mut self)290     fn tick(&mut self) {
291         self.tick = self.tick.wrapping_add(1);
292     }
293 
next_task(&mut self, handle: &Handle) -> Option<Notified>294     fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
295         if self.tick % self.global_queue_interval == 0 {
296             handle
297                 .next_remote_task()
298                 .or_else(|| self.next_local_task(handle))
299         } else {
300             self.next_local_task(handle)
301                 .or_else(|| handle.next_remote_task())
302         }
303     }
304 
next_local_task(&mut self, handle: &Handle) -> Option<Notified>305     fn next_local_task(&mut self, handle: &Handle) -> Option<Notified> {
306         let ret = self.tasks.pop_front();
307         handle
308             .shared
309             .worker_metrics
310             .set_queue_depth(self.tasks.len());
311         ret
312     }
313 
push_task(&mut self, handle: &Handle, task: Notified)314     fn push_task(&mut self, handle: &Handle, task: Notified) {
315         self.tasks.push_back(task);
316         self.metrics.inc_local_schedule_count();
317         handle
318             .shared
319             .worker_metrics
320             .set_queue_depth(self.tasks.len());
321     }
322 
submit_metrics(&mut self, handle: &Handle)323     fn submit_metrics(&mut self, handle: &Handle) {
324         self.metrics.submit(&handle.shared.worker_metrics, 0);
325     }
326 }
327 
328 #[cfg(tokio_taskdump)]
wake_deferred_tasks_and_free(context: &Context)329 fn wake_deferred_tasks_and_free(context: &Context) {
330     let wakers = context.defer.take_deferred();
331     for waker in wakers {
332         waker.wake();
333     }
334 }
335 
336 // ===== impl Context =====
337 
338 impl Context {
339     /// Execute the closure with the given scheduler core stored in the
340     /// thread-local context.
run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R)341     fn run_task<R>(&self, mut core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
342         core.metrics.start_poll();
343         let mut ret = self.enter(core, || crate::runtime::coop::budget(f));
344         ret.0.metrics.end_poll();
345         ret
346     }
347 
348     /// Blocks the current thread until an event is received by the driver,
349     /// including I/O events, timer events, ...
park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core>350     fn park(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
351         let mut driver = core.driver.take().expect("driver missing");
352 
353         if let Some(f) = &handle.shared.config.before_park {
354             // Incorrect lint, the closures are actually different types so `f`
355             // cannot be passed as an argument to `enter`.
356             #[allow(clippy::redundant_closure)]
357             let (c, _) = self.enter(core, || f());
358             core = c;
359         }
360 
361         // This check will fail if `before_park` spawns a task for us to run
362         // instead of parking the thread
363         if core.tasks.is_empty() {
364             // Park until the thread is signaled
365             core.metrics.about_to_park();
366             core.submit_metrics(handle);
367 
368             let (c, _) = self.enter(core, || {
369                 driver.park(&handle.driver);
370                 self.defer.wake();
371             });
372 
373             core = c;
374         }
375 
376         if let Some(f) = &handle.shared.config.after_unpark {
377             // Incorrect lint, the closures are actually different types so `f`
378             // cannot be passed as an argument to `enter`.
379             #[allow(clippy::redundant_closure)]
380             let (c, _) = self.enter(core, || f());
381             core = c;
382         }
383 
384         core.driver = Some(driver);
385         core
386     }
387 
388     /// Checks the driver for new events without blocking the thread.
park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core>389     fn park_yield(&self, mut core: Box<Core>, handle: &Handle) -> Box<Core> {
390         let mut driver = core.driver.take().expect("driver missing");
391 
392         core.submit_metrics(handle);
393 
394         let (mut core, _) = self.enter(core, || {
395             driver.park_timeout(&handle.driver, Duration::from_millis(0));
396             self.defer.wake();
397         });
398 
399         core.driver = Some(driver);
400         core
401     }
402 
enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R)403     fn enter<R>(&self, core: Box<Core>, f: impl FnOnce() -> R) -> (Box<Core>, R) {
404         // Store the scheduler core in the thread-local context
405         //
406         // A drop-guard is employed at a higher level.
407         *self.core.borrow_mut() = Some(core);
408 
409         // Execute the closure while tracking the execution budget
410         let ret = f();
411 
412         // Take the scheduler core back
413         let core = self.core.borrow_mut().take().expect("core missing");
414         (core, ret)
415     }
416 
defer(&self, waker: &Waker)417     pub(crate) fn defer(&self, waker: &Waker) {
418         self.defer.defer(waker);
419     }
420 }
421 
422 // ===== impl Handle =====
423 
424 impl Handle {
425     /// Spawns a future onto the `CurrentThread` scheduler
spawn<F>( me: &Arc<Self>, future: F, id: crate::runtime::task::Id, ) -> JoinHandle<F::Output> where F: crate::future::Future + Send + 'static, F::Output: Send + 'static,426     pub(crate) fn spawn<F>(
427         me: &Arc<Self>,
428         future: F,
429         id: crate::runtime::task::Id,
430     ) -> JoinHandle<F::Output>
431     where
432         F: crate::future::Future + Send + 'static,
433         F::Output: Send + 'static,
434     {
435         let (handle, notified) = me.shared.owned.bind(future, me.clone(), id);
436 
437         if let Some(notified) = notified {
438             me.schedule(notified);
439         }
440 
441         handle
442     }
443 
444     /// Capture a snapshot of this runtime's state.
445     #[cfg(all(
446         tokio_unstable,
447         tokio_taskdump,
448         target_os = "linux",
449         any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
450     ))]
dump(&self) -> crate::runtime::Dump451     pub(crate) fn dump(&self) -> crate::runtime::Dump {
452         use crate::runtime::dump;
453         use task::trace::trace_current_thread;
454 
455         let mut traces = vec![];
456 
457         // todo: how to make this work outside of a runtime context?
458         context::with_scheduler(|maybe_context| {
459             // drain the local queue
460             let context = if let Some(context) = maybe_context {
461                 context.expect_current_thread()
462             } else {
463                 return;
464             };
465             let mut maybe_core = context.core.borrow_mut();
466             let core = if let Some(core) = maybe_core.as_mut() {
467                 core
468             } else {
469                 return;
470             };
471             let local = &mut core.tasks;
472 
473             if self.shared.inject.is_closed() {
474                 return;
475             }
476 
477             traces = trace_current_thread(&self.shared.owned, local, &self.shared.inject)
478                 .into_iter()
479                 .map(dump::Task::new)
480                 .collect();
481 
482             // Avoid double borrow panic
483             drop(maybe_core);
484 
485             // Taking a taskdump could wakes every task, but we probably don't want
486             // the `yield_now` vector to be that large under normal circumstances.
487             // Therefore, we free its allocation.
488             wake_deferred_tasks_and_free(context);
489         });
490 
491         dump::Dump::new(traces)
492     }
493 
next_remote_task(&self) -> Option<Notified>494     fn next_remote_task(&self) -> Option<Notified> {
495         self.shared.inject.pop()
496     }
497 
waker_ref(me: &Arc<Self>) -> WakerRef<'_>498     fn waker_ref(me: &Arc<Self>) -> WakerRef<'_> {
499         // Set woken to true when enter block_on, ensure outer future
500         // be polled for the first time when enter loop
501         me.shared.woken.store(true, Release);
502         waker_ref(me)
503     }
504 
505     // reset woken to false and return original value
reset_woken(&self) -> bool506     pub(crate) fn reset_woken(&self) -> bool {
507         self.shared.woken.swap(false, AcqRel)
508     }
509 }
510 
511 cfg_metrics! {
512     impl Handle {
513         pub(crate) fn scheduler_metrics(&self) -> &SchedulerMetrics {
514             &self.shared.scheduler_metrics
515         }
516 
517         pub(crate) fn injection_queue_depth(&self) -> usize {
518             self.shared.inject.len()
519         }
520 
521         pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
522             assert_eq!(0, worker);
523             &self.shared.worker_metrics
524         }
525 
526         pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
527             self.worker_metrics(worker).queue_depth()
528         }
529 
530         pub(crate) fn num_blocking_threads(&self) -> usize {
531             self.blocking_spawner.num_threads()
532         }
533 
534         pub(crate) fn num_idle_blocking_threads(&self) -> usize {
535             self.blocking_spawner.num_idle_threads()
536         }
537 
538         pub(crate) fn blocking_queue_depth(&self) -> usize {
539             self.blocking_spawner.queue_depth()
540         }
541 
542         pub(crate) fn active_tasks_count(&self) -> usize {
543             self.shared.owned.active_tasks_count()
544         }
545     }
546 }
547 
548 cfg_unstable! {
549     use std::num::NonZeroU64;
550 
551     impl Handle {
552         pub(crate) fn owned_id(&self) -> NonZeroU64 {
553             self.shared.owned.id
554         }
555     }
556 }
557 
558 impl fmt::Debug for Handle {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result559     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
560         fmt.debug_struct("current_thread::Handle { ... }").finish()
561     }
562 }
563 
564 // ===== impl Shared =====
565 
566 impl Schedule for Arc<Handle> {
release(&self, task: &Task<Self>) -> Option<Task<Self>>567     fn release(&self, task: &Task<Self>) -> Option<Task<Self>> {
568         self.shared.owned.remove(task)
569     }
570 
schedule(&self, task: task::Notified<Self>)571     fn schedule(&self, task: task::Notified<Self>) {
572         use scheduler::Context::CurrentThread;
573 
574         context::with_scheduler(|maybe_cx| match maybe_cx {
575             Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
576                 let mut core = cx.core.borrow_mut();
577 
578                 // If `None`, the runtime is shutting down, so there is no need
579                 // to schedule the task.
580                 if let Some(core) = core.as_mut() {
581                     core.push_task(self, task);
582                 }
583             }
584             _ => {
585                 // Track that a task was scheduled from **outside** of the runtime.
586                 self.shared.scheduler_metrics.inc_remote_schedule_count();
587 
588                 // Schedule the task
589                 self.shared.inject.push(task);
590                 self.driver.unpark();
591             }
592         });
593     }
594 
595     cfg_unstable! {
596         fn unhandled_panic(&self) {
597             use crate::runtime::UnhandledPanic;
598 
599             match self.shared.config.unhandled_panic {
600                 UnhandledPanic::Ignore => {
601                     // Do nothing
602                 }
603                 UnhandledPanic::ShutdownRuntime => {
604                     use scheduler::Context::CurrentThread;
605 
606                     // This hook is only called from within the runtime, so
607                     // `context::with_scheduler` should match with `&self`, i.e.
608                     // there is no opportunity for a nested scheduler to be
609                     // called.
610                     context::with_scheduler(|maybe_cx| match maybe_cx {
611                         Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
612                             let mut core = cx.core.borrow_mut();
613 
614                             // If `None`, the runtime is shutting down, so there is no need to signal shutdown
615                             if let Some(core) = core.as_mut() {
616                                 core.unhandled_panic = true;
617                                 self.shared.owned.close_and_shutdown_all();
618                             }
619                         }
620                         _ => unreachable!("runtime core not set in CURRENT thread-local"),
621                     })
622                 }
623             }
624         }
625     }
626 }
627 
628 impl Wake for Handle {
wake(arc_self: Arc<Self>)629     fn wake(arc_self: Arc<Self>) {
630         Wake::wake_by_ref(&arc_self)
631     }
632 
633     /// Wake by reference
wake_by_ref(arc_self: &Arc<Self>)634     fn wake_by_ref(arc_self: &Arc<Self>) {
635         arc_self.shared.woken.store(true, Release);
636         arc_self.driver.unpark();
637     }
638 }
639 
640 // ===== CoreGuard =====
641 
642 /// Used to ensure we always place the `Core` value back into its slot in
643 /// `CurrentThread`, even if the future panics.
644 struct CoreGuard<'a> {
645     context: scheduler::Context,
646     scheduler: &'a CurrentThread,
647 }
648 
649 impl CoreGuard<'_> {
650     #[track_caller]
block_on<F: Future>(self, future: F) -> F::Output651     fn block_on<F: Future>(self, future: F) -> F::Output {
652         let ret = self.enter(|mut core, context| {
653             let waker = Handle::waker_ref(&context.handle);
654             let mut cx = std::task::Context::from_waker(&waker);
655 
656             pin!(future);
657 
658             core.metrics.start_processing_scheduled_tasks();
659 
660             'outer: loop {
661                 let handle = &context.handle;
662 
663                 if handle.reset_woken() {
664                     let (c, res) = context.enter(core, || {
665                         crate::runtime::coop::budget(|| future.as_mut().poll(&mut cx))
666                     });
667 
668                     core = c;
669 
670                     if let Ready(v) = res {
671                         return (core, Some(v));
672                     }
673                 }
674 
675                 for _ in 0..handle.shared.config.event_interval {
676                     // Make sure we didn't hit an unhandled_panic
677                     if core.unhandled_panic {
678                         return (core, None);
679                     }
680 
681                     core.tick();
682 
683                     let entry = core.next_task(handle);
684 
685                     let task = match entry {
686                         Some(entry) => entry,
687                         None => {
688                             core.metrics.end_processing_scheduled_tasks();
689 
690                             core = if !context.defer.is_empty() {
691                                 context.park_yield(core, handle)
692                             } else {
693                                 context.park(core, handle)
694                             };
695 
696                             core.metrics.start_processing_scheduled_tasks();
697 
698                             // Try polling the `block_on` future next
699                             continue 'outer;
700                         }
701                     };
702 
703                     let task = context.handle.shared.owned.assert_owner(task);
704 
705                     let (c, _) = context.run_task(core, || {
706                         task.run();
707                     });
708 
709                     core = c;
710                 }
711 
712                 core.metrics.end_processing_scheduled_tasks();
713 
714                 // Yield to the driver, this drives the timer and pulls any
715                 // pending I/O events.
716                 core = context.park_yield(core, handle);
717 
718                 core.metrics.start_processing_scheduled_tasks();
719             }
720         });
721 
722         match ret {
723             Some(ret) => ret,
724             None => {
725                 // `block_on` panicked.
726                 panic!("a spawned task panicked and the runtime is configured to shut down on unhandled panic");
727             }
728         }
729     }
730 
731     /// Enters the scheduler context. This sets the queue and other necessary
732     /// scheduler state in the thread-local.
enter<F, R>(self, f: F) -> R where F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),733     fn enter<F, R>(self, f: F) -> R
734     where
735         F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
736     {
737         let context = self.context.expect_current_thread();
738 
739         // Remove `core` from `context` to pass into the closure.
740         let core = context.core.borrow_mut().take().expect("core missing");
741 
742         // Call the closure and place `core` back
743         let (core, ret) = context::set_scheduler(&self.context, || f(core, context));
744 
745         *context.core.borrow_mut() = Some(core);
746 
747         ret
748     }
749 }
750 
751 impl Drop for CoreGuard<'_> {
drop(&mut self)752     fn drop(&mut self) {
753         let context = self.context.expect_current_thread();
754 
755         if let Some(core) = context.core.borrow_mut().take() {
756             // Replace old scheduler back into the state to allow
757             // other threads to pick it up and drive it.
758             self.scheduler.core.set(core);
759 
760             // Wake up other possible threads that could steal the driver.
761             self.scheduler.notify.notify_one()
762         }
763     }
764 }
765