1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6 //!
7 //! # Shutdown
8 //!
9 //! Shutting down the runtime involves the following steps:
10 //!
11 //! 1. The Shared::close method is called. This closes the inject queue and
12 //! OwnedTasks instance and wakes up all worker threads.
13 //!
14 //! 2. Each worker thread observes the close signal next time it runs
15 //! Core::maintenance by checking whether the inject queue is closed.
16 //! The Core::is_shutdown flag is set to true.
17 //!
18 //! 3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19 //! will keep removing tasks from OwnedTasks until it is empty. No new
20 //! tasks can be pushed to the OwnedTasks during or after this step as it
21 //! was closed in step 1.
22 //!
23 //! 5. The workers call Shared::shutdown to enter the single-threaded phase of
24 //! shutdown. These calls will push their core to Shared::shutdown_cores,
25 //! and the last thread to push its core will finish the shutdown procedure.
26 //!
27 //! 6. The local run queue of each core is emptied, then the inject queue is
28 //! emptied.
29 //!
30 //! At this point, shutdown has completed. It is not possible for any of the
31 //! collections to contain any tasks at this point, as each collection was
32 //! closed first, then emptied afterwards.
33 //!
34 //! ## Spawns during shutdown
35 //!
36 //! When spawning tasks during shutdown, there are two cases:
37 //!
38 //! * The spawner observes the OwnedTasks being open, and the inject queue is
39 //! closed.
40 //! * The spawner observes the OwnedTasks being closed and doesn't check the
41 //! inject queue.
42 //!
43 //! The first case can only happen if the OwnedTasks::bind call happens before
44 //! or during step 1 of shutdown. In this case, the runtime will clean up the
45 //! task in step 3 of shutdown.
46 //!
47 //! In the latter case, the task was not spawned and the task is immediately
48 //! cancelled by the spawner.
49 //!
50 //! The correctness of shutdown requires both the inject queue and OwnedTasks
51 //! collection to have a closed bit. With a close bit on only the inject queue,
52 //! spawning could run in to a situation where a task is successfully bound long
53 //! after the runtime has shut down. With a close bit on only the OwnedTasks,
54 //! the first spawning situation could result in the notification being pushed
55 //! to the inject queue after step 6 of shutdown, which would leave a task in
56 //! the inject queue indefinitely. This would be a ref-count cycle and a memory
57 //! leak.
58
59 use crate::loom::sync::{Arc, Mutex};
60 use crate::runtime;
61 use crate::runtime::context;
62 use crate::runtime::scheduler::multi_thread::{queue, Handle, Idle, Parker, Unparker};
63 use crate::runtime::task::{Inject, OwnedTasks};
64 use crate::runtime::{
65 blocking, coop, driver, scheduler, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics,
66 };
67 use crate::util::atomic_cell::AtomicCell;
68 use crate::util::rand::{FastRand, RngSeedGenerator};
69
70 use std::cell::RefCell;
71 use std::time::Duration;
72
73 /// A scheduler worker
74 pub(super) struct Worker {
75 /// Reference to scheduler's handle
76 handle: Arc<Handle>,
77
78 /// Index holding this worker's remote state
79 index: usize,
80
81 /// Used to hand-off a worker's core to another thread.
82 core: AtomicCell<Core>,
83 }
84
85 /// Core data
86 struct Core {
87 /// Used to schedule bookkeeping tasks every so often.
88 tick: u32,
89
90 /// When a task is scheduled from a worker, it is stored in this slot. The
91 /// worker will check this slot for a task **before** checking the run
92 /// queue. This effectively results in the **last** scheduled task to be run
93 /// next (LIFO). This is an optimization for message passing patterns and
94 /// helps to reduce latency.
95 lifo_slot: Option<Notified>,
96
97 /// The worker-local run queue.
98 run_queue: queue::Local<Arc<Handle>>,
99
100 /// True if the worker is currently searching for more work. Searching
101 /// involves attempting to steal from other workers.
102 is_searching: bool,
103
104 /// True if the scheduler is being shutdown
105 is_shutdown: bool,
106
107 /// Parker
108 ///
109 /// Stored in an `Option` as the parker is added / removed to make the
110 /// borrow checker happy.
111 park: Option<Parker>,
112
113 /// Batching metrics so they can be submitted to RuntimeMetrics.
114 metrics: MetricsBatch,
115
116 /// Fast random number generator.
117 rand: FastRand,
118 }
119
120 /// State shared across all workers
121 pub(super) struct Shared {
122 /// Per-worker remote state. All other workers have access to this and is
123 /// how they communicate between each other.
124 remotes: Box<[Remote]>,
125
126 /// Global task queue used for:
127 /// 1. Submit work to the scheduler while **not** currently on a worker thread.
128 /// 2. Submit work to the scheduler when a worker run queue is saturated
129 inject: Inject<Arc<Handle>>,
130
131 /// Coordinates idle workers
132 idle: Idle,
133
134 /// Collection of all active tasks spawned onto this executor.
135 pub(super) owned: OwnedTasks<Arc<Handle>>,
136
137 /// Cores that have observed the shutdown signal
138 ///
139 /// The core is **not** placed back in the worker to avoid it from being
140 /// stolen by a thread that was spawned as part of `block_in_place`.
141 #[allow(clippy::vec_box)] // we're moving an already-boxed value
142 shutdown_cores: Mutex<Vec<Box<Core>>>,
143
144 /// Scheduler configuration options
145 config: Config,
146
147 /// Collects metrics from the runtime.
148 pub(super) scheduler_metrics: SchedulerMetrics,
149
150 pub(super) worker_metrics: Box<[WorkerMetrics]>,
151 }
152
153 /// Used to communicate with a worker from other threads.
154 struct Remote {
155 /// Steals tasks from this worker.
156 steal: queue::Steal<Arc<Handle>>,
157
158 /// Unparks the associated worker thread
159 unpark: Unparker,
160 }
161
162 /// Thread-local context
163 struct Context {
164 /// Worker
165 worker: Arc<Worker>,
166
167 /// Core data
168 core: RefCell<Option<Box<Core>>>,
169 }
170
171 /// Starts the workers
172 pub(crate) struct Launch(Vec<Arc<Worker>>);
173
174 /// Running a task may consume the core. If the core is still available when
175 /// running the task completes, it is returned. Otherwise, the worker will need
176 /// to stop processing.
177 type RunResult = Result<Box<Core>, ()>;
178
179 /// A task handle
180 type Task = task::Task<Arc<Handle>>;
181
182 /// A notified task handle
183 type Notified = task::Notified<Arc<Handle>>;
184
185 // Tracks thread-local state
186 scoped_thread_local!(static CURRENT: Context);
187
create( size: usize, park: Parker, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config, ) -> (Arc<Handle>, Launch)188 pub(super) fn create(
189 size: usize,
190 park: Parker,
191 driver_handle: driver::Handle,
192 blocking_spawner: blocking::Spawner,
193 seed_generator: RngSeedGenerator,
194 config: Config,
195 ) -> (Arc<Handle>, Launch) {
196 let mut cores = Vec::with_capacity(size);
197 let mut remotes = Vec::with_capacity(size);
198 let mut worker_metrics = Vec::with_capacity(size);
199
200 // Create the local queues
201 for _ in 0..size {
202 let (steal, run_queue) = queue::local();
203
204 let park = park.clone();
205 let unpark = park.unpark();
206
207 cores.push(Box::new(Core {
208 tick: 0,
209 lifo_slot: None,
210 run_queue,
211 is_searching: false,
212 is_shutdown: false,
213 park: Some(park),
214 metrics: MetricsBatch::new(),
215 rand: FastRand::new(config.seed_generator.next_seed()),
216 }));
217
218 remotes.push(Remote { steal, unpark });
219 worker_metrics.push(WorkerMetrics::new());
220 }
221
222 let handle = Arc::new(Handle {
223 shared: Shared {
224 remotes: remotes.into_boxed_slice(),
225 inject: Inject::new(),
226 idle: Idle::new(size),
227 owned: OwnedTasks::new(),
228 shutdown_cores: Mutex::new(vec![]),
229 config,
230 scheduler_metrics: SchedulerMetrics::new(),
231 worker_metrics: worker_metrics.into_boxed_slice(),
232 },
233 driver: driver_handle,
234 blocking_spawner,
235 seed_generator,
236 });
237
238 let mut launch = Launch(vec![]);
239
240 for (index, core) in cores.drain(..).enumerate() {
241 launch.0.push(Arc::new(Worker {
242 handle: handle.clone(),
243 index,
244 core: AtomicCell::new(Some(core)),
245 }));
246 }
247
248 (handle, launch)
249 }
250
251 #[track_caller]
block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R,252 pub(crate) fn block_in_place<F, R>(f: F) -> R
253 where
254 F: FnOnce() -> R,
255 {
256 // Try to steal the worker core back
257 struct Reset(coop::Budget);
258
259 impl Drop for Reset {
260 fn drop(&mut self) {
261 CURRENT.with(|maybe_cx| {
262 if let Some(cx) = maybe_cx {
263 let core = cx.worker.core.take();
264 let mut cx_core = cx.core.borrow_mut();
265 assert!(cx_core.is_none());
266 *cx_core = core;
267
268 // Reset the task budget as we are re-entering the
269 // runtime.
270 coop::set(self.0);
271 }
272 });
273 }
274 }
275
276 let mut had_entered = false;
277
278 let setup_result = CURRENT.with(|maybe_cx| {
279 match (
280 crate::runtime::context::current_enter_context(),
281 maybe_cx.is_some(),
282 ) {
283 (context::EnterRuntime::Entered { .. }, true) => {
284 // We are on a thread pool runtime thread, so we just need to
285 // set up blocking.
286 had_entered = true;
287 }
288 (
289 context::EnterRuntime::Entered {
290 allow_block_in_place,
291 },
292 false,
293 ) => {
294 // We are on an executor, but _not_ on the thread pool. That is
295 // _only_ okay if we are in a thread pool runtime's block_on
296 // method:
297 if allow_block_in_place {
298 had_entered = true;
299 return Ok(());
300 } else {
301 // This probably means we are on the current_thread runtime or in a
302 // LocalSet, where it is _not_ okay to block.
303 return Err(
304 "can call blocking only when running on the multi-threaded runtime",
305 );
306 }
307 }
308 (context::EnterRuntime::NotEntered, true) => {
309 // This is a nested call to block_in_place (we already exited).
310 // All the necessary setup has already been done.
311 return Ok(());
312 }
313 (context::EnterRuntime::NotEntered, false) => {
314 // We are outside of the tokio runtime, so blocking is fine.
315 // We can also skip all of the thread pool blocking setup steps.
316 return Ok(());
317 }
318 }
319
320 let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
321
322 // Get the worker core. If none is set, then blocking is fine!
323 let core = match cx.core.borrow_mut().take() {
324 Some(core) => core,
325 None => return Ok(()),
326 };
327
328 // The parker should be set here
329 assert!(core.park.is_some());
330
331 // In order to block, the core must be sent to another thread for
332 // execution.
333 //
334 // First, move the core back into the worker's shared core slot.
335 cx.worker.core.set(core);
336
337 // Next, clone the worker handle and send it to a new thread for
338 // processing.
339 //
340 // Once the blocking task is done executing, we will attempt to
341 // steal the core back.
342 let worker = cx.worker.clone();
343 runtime::spawn_blocking(move || run(worker));
344 Ok(())
345 });
346
347 if let Err(panic_message) = setup_result {
348 panic!("{}", panic_message);
349 }
350
351 if had_entered {
352 // Unset the current task's budget. Blocking sections are not
353 // constrained by task budgets.
354 let _reset = Reset(coop::stop());
355
356 crate::runtime::context::exit_runtime(f)
357 } else {
358 f()
359 }
360 }
361
362 impl Launch {
launch(mut self)363 pub(crate) fn launch(mut self) {
364 for worker in self.0.drain(..) {
365 runtime::spawn_blocking(move || run(worker));
366 }
367 }
368 }
369
run(worker: Arc<Worker>)370 fn run(worker: Arc<Worker>) {
371 struct AbortOnPanic;
372
373 impl Drop for AbortOnPanic {
374 fn drop(&mut self) {
375 if std::thread::panicking() {
376 eprintln!("worker thread panicking; aborting process");
377 std::process::abort();
378 }
379 }
380 }
381
382 // Catching panics on worker threads in tests is quite tricky. Instead, when
383 // debug assertions are enabled, we just abort the process.
384 #[cfg(debug_assertions)]
385 let _abort_on_panic = AbortOnPanic;
386
387 // Acquire a core. If this fails, then another thread is running this
388 // worker and there is nothing further to do.
389 let core = match worker.core.take() {
390 Some(core) => core,
391 None => return,
392 };
393
394 let handle = scheduler::Handle::MultiThread(worker.handle.clone());
395 let _enter = crate::runtime::context::enter_runtime(&handle, true);
396
397 // Set the worker context.
398 let cx = Context {
399 worker,
400 core: RefCell::new(None),
401 };
402
403 CURRENT.set(&cx, || {
404 // This should always be an error. It only returns a `Result` to support
405 // using `?` to short circuit.
406 assert!(cx.run(core).is_err());
407
408 // Check if there are any deferred tasks to notify. This can happen when
409 // the worker core is lost due to `block_in_place()` being called from
410 // within the task.
411 wake_deferred_tasks();
412 });
413 }
414
415 impl Context {
run(&self, mut core: Box<Core>) -> RunResult416 fn run(&self, mut core: Box<Core>) -> RunResult {
417 while !core.is_shutdown {
418 // Increment the tick
419 core.tick();
420
421 // Run maintenance, if needed
422 core = self.maintenance(core);
423
424 // First, check work available to the current worker.
425 if let Some(task) = core.next_task(&self.worker) {
426 core = self.run_task(task, core)?;
427 continue;
428 }
429
430 // There is no more **local** work to process, try to steal work
431 // from other workers.
432 if let Some(task) = core.steal_work(&self.worker) {
433 core = self.run_task(task, core)?;
434 } else {
435 // Wait for work
436 core = if did_defer_tasks() {
437 self.park_timeout(core, Some(Duration::from_millis(0)))
438 } else {
439 self.park(core)
440 };
441 }
442 }
443
444 core.pre_shutdown(&self.worker);
445
446 // Signal shutdown
447 self.worker.handle.shutdown_core(core);
448 Err(())
449 }
450
run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult451 fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
452 let task = self.worker.handle.shared.owned.assert_owner(task);
453
454 // Make sure the worker is not in the **searching** state. This enables
455 // another idle worker to try to steal work.
456 core.transition_from_searching(&self.worker);
457
458 // Make the core available to the runtime context
459 core.metrics.incr_poll_count();
460 *self.core.borrow_mut() = Some(core);
461
462 // Run the task
463 coop::budget(|| {
464 task.run();
465
466 // As long as there is budget remaining and a task exists in the
467 // `lifo_slot`, then keep running.
468 loop {
469 // Check if we still have the core. If not, the core was stolen
470 // by another worker.
471 let mut core = match self.core.borrow_mut().take() {
472 Some(core) => core,
473 None => return Err(()),
474 };
475
476 // Check for a task in the LIFO slot
477 let task = match core.lifo_slot.take() {
478 Some(task) => task,
479 None => return Ok(core),
480 };
481
482 if coop::has_budget_remaining() {
483 // Run the LIFO task, then loop
484 core.metrics.incr_poll_count();
485 *self.core.borrow_mut() = Some(core);
486 let task = self.worker.handle.shared.owned.assert_owner(task);
487 task.run();
488 } else {
489 // Not enough budget left to run the LIFO task, push it to
490 // the back of the queue and return.
491 core.run_queue
492 .push_back(task, self.worker.inject(), &mut core.metrics);
493 return Ok(core);
494 }
495 }
496 })
497 }
498
maintenance(&self, mut core: Box<Core>) -> Box<Core>499 fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
500 if core.tick % self.worker.handle.shared.config.event_interval == 0 {
501 // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
502 // to run without actually putting the thread to sleep.
503 core = self.park_timeout(core, Some(Duration::from_millis(0)));
504
505 // Run regularly scheduled maintenance
506 core.maintenance(&self.worker);
507 }
508
509 core
510 }
511
512 /// Parks the worker thread while waiting for tasks to execute.
513 ///
514 /// This function checks if indeed there's no more work left to be done before parking.
515 /// Also important to notice that, before parking, the worker thread will try to take
516 /// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
517 /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
518 /// in its own local queue until the queue saturates (ntasks > LOCAL_QUEUE_CAPACITY).
519 /// When the local queue is saturated, the overflow tasks are added to the injection queue
520 /// from where other workers can pick them up.
521 /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
522 /// after all the IOs get dispatched
park(&self, mut core: Box<Core>) -> Box<Core>523 fn park(&self, mut core: Box<Core>) -> Box<Core> {
524 if let Some(f) = &self.worker.handle.shared.config.before_park {
525 f();
526 }
527
528 if core.transition_to_parked(&self.worker) {
529 while !core.is_shutdown {
530 core.metrics.about_to_park();
531 core = self.park_timeout(core, None);
532 core.metrics.returned_from_park();
533
534 // Run regularly scheduled maintenance
535 core.maintenance(&self.worker);
536
537 if core.transition_from_parked(&self.worker) {
538 break;
539 }
540 }
541 }
542
543 if let Some(f) = &self.worker.handle.shared.config.after_unpark {
544 f();
545 }
546 core
547 }
548
park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core>549 fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
550 // Take the parker out of core
551 let mut park = core.park.take().expect("park missing");
552
553 // Store `core` in context
554 *self.core.borrow_mut() = Some(core);
555
556 // Park thread
557 if let Some(timeout) = duration {
558 park.park_timeout(&self.worker.handle.driver, timeout);
559 } else {
560 park.park(&self.worker.handle.driver);
561 }
562
563 wake_deferred_tasks();
564
565 // Remove `core` from context
566 core = self.core.borrow_mut().take().expect("core missing");
567
568 // Place `park` back in `core`
569 core.park = Some(park);
570
571 // If there are tasks available to steal, but this worker is not
572 // looking for tasks to steal, notify another worker.
573 if !core.is_searching && core.run_queue.is_stealable() {
574 self.worker.handle.notify_parked();
575 }
576
577 core
578 }
579 }
580
581 impl Core {
582 /// Increment the tick
tick(&mut self)583 fn tick(&mut self) {
584 self.tick = self.tick.wrapping_add(1);
585 }
586
587 /// Return the next notified task available to this worker.
next_task(&mut self, worker: &Worker) -> Option<Notified>588 fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
589 if self.tick % worker.handle.shared.config.global_queue_interval == 0 {
590 worker.inject().pop().or_else(|| self.next_local_task())
591 } else {
592 self.next_local_task().or_else(|| worker.inject().pop())
593 }
594 }
595
next_local_task(&mut self) -> Option<Notified>596 fn next_local_task(&mut self) -> Option<Notified> {
597 self.lifo_slot.take().or_else(|| self.run_queue.pop())
598 }
599
600 /// Function responsible for stealing tasks from another worker
601 ///
602 /// Note: Only if less than half the workers are searching for tasks to steal
603 /// a new worker will actually try to steal. The idea is to make sure not all
604 /// workers will be trying to steal at the same time.
steal_work(&mut self, worker: &Worker) -> Option<Notified>605 fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
606 if !self.transition_to_searching(worker) {
607 return None;
608 }
609
610 let num = worker.handle.shared.remotes.len();
611 // Start from a random worker
612 let start = self.rand.fastrand_n(num as u32) as usize;
613
614 for i in 0..num {
615 let i = (start + i) % num;
616
617 // Don't steal from ourself! We know we don't have work.
618 if i == worker.index {
619 continue;
620 }
621
622 let target = &worker.handle.shared.remotes[i];
623 if let Some(task) = target
624 .steal
625 .steal_into(&mut self.run_queue, &mut self.metrics)
626 {
627 return Some(task);
628 }
629 }
630
631 // Fallback on checking the global queue
632 worker.handle.shared.inject.pop()
633 }
634
transition_to_searching(&mut self, worker: &Worker) -> bool635 fn transition_to_searching(&mut self, worker: &Worker) -> bool {
636 if !self.is_searching {
637 self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
638 }
639
640 self.is_searching
641 }
642
transition_from_searching(&mut self, worker: &Worker)643 fn transition_from_searching(&mut self, worker: &Worker) {
644 if !self.is_searching {
645 return;
646 }
647
648 self.is_searching = false;
649 worker.handle.transition_worker_from_searching();
650 }
651
652 /// Prepares the worker state for parking.
653 ///
654 /// Returns true if the transition happened, false if there is work to do first.
transition_to_parked(&mut self, worker: &Worker) -> bool655 fn transition_to_parked(&mut self, worker: &Worker) -> bool {
656 // Workers should not park if they have work to do
657 if self.lifo_slot.is_some() || self.run_queue.has_tasks() {
658 return false;
659 }
660
661 // When the final worker transitions **out** of searching to parked, it
662 // must check all the queues one last time in case work materialized
663 // between the last work scan and transitioning out of searching.
664 let is_last_searcher = worker
665 .handle
666 .shared
667 .idle
668 .transition_worker_to_parked(worker.index, self.is_searching);
669
670 // The worker is no longer searching. Setting this is the local cache
671 // only.
672 self.is_searching = false;
673
674 if is_last_searcher {
675 worker.handle.notify_if_work_pending();
676 }
677
678 true
679 }
680
681 /// Returns `true` if the transition happened.
transition_from_parked(&mut self, worker: &Worker) -> bool682 fn transition_from_parked(&mut self, worker: &Worker) -> bool {
683 // If a task is in the lifo slot, then we must unpark regardless of
684 // being notified
685 if self.lifo_slot.is_some() {
686 // When a worker wakes, it should only transition to the "searching"
687 // state when the wake originates from another worker *or* a new task
688 // is pushed. We do *not* want the worker to transition to "searching"
689 // when it wakes when the I/O driver receives new events.
690 self.is_searching = !worker.handle.shared.idle.unpark_worker_by_id(worker.index);
691 return true;
692 }
693
694 if worker.handle.shared.idle.is_parked(worker.index) {
695 return false;
696 }
697
698 // When unparked, the worker is in the searching state.
699 self.is_searching = true;
700 true
701 }
702
703 /// Runs maintenance work such as checking the pool's state.
maintenance(&mut self, worker: &Worker)704 fn maintenance(&mut self, worker: &Worker) {
705 self.metrics
706 .submit(&worker.handle.shared.worker_metrics[worker.index]);
707
708 if !self.is_shutdown {
709 // Check if the scheduler has been shutdown
710 self.is_shutdown = worker.inject().is_closed();
711 }
712 }
713
714 /// Signals all tasks to shut down, and waits for them to complete. Must run
715 /// before we enter the single-threaded phase of shutdown processing.
pre_shutdown(&mut self, worker: &Worker)716 fn pre_shutdown(&mut self, worker: &Worker) {
717 // Signal to all tasks to shut down.
718 worker.handle.shared.owned.close_and_shutdown_all();
719
720 self.metrics
721 .submit(&worker.handle.shared.worker_metrics[worker.index]);
722 }
723
724 /// Shuts down the core.
shutdown(&mut self, handle: &Handle)725 fn shutdown(&mut self, handle: &Handle) {
726 // Take the core
727 let mut park = self.park.take().expect("park missing");
728
729 // Drain the queue
730 while self.next_local_task().is_some() {}
731
732 park.shutdown(&handle.driver);
733 }
734 }
735
736 impl Worker {
737 /// Returns a reference to the scheduler's injection queue.
inject(&self) -> &Inject<Arc<Handle>>738 fn inject(&self) -> &Inject<Arc<Handle>> {
739 &self.handle.shared.inject
740 }
741 }
742
743 // TODO: Move `Handle` impls into handle.rs
744 impl task::Schedule for Arc<Handle> {
release(&self, task: &Task) -> Option<Task>745 fn release(&self, task: &Task) -> Option<Task> {
746 self.shared.owned.remove(task)
747 }
748
schedule(&self, task: Notified)749 fn schedule(&self, task: Notified) {
750 self.schedule_task(task, false);
751 }
752
yield_now(&self, task: Notified)753 fn yield_now(&self, task: Notified) {
754 self.schedule_task(task, true);
755 }
756 }
757
758 impl Handle {
schedule_task(&self, task: Notified, is_yield: bool)759 pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
760 CURRENT.with(|maybe_cx| {
761 if let Some(cx) = maybe_cx {
762 // Make sure the task is part of the **current** scheduler.
763 if self.ptr_eq(&cx.worker.handle) {
764 // And the current thread still holds a core
765 if let Some(core) = cx.core.borrow_mut().as_mut() {
766 self.schedule_local(core, task, is_yield);
767 return;
768 }
769 }
770 }
771
772 // Otherwise, use the inject queue.
773 self.shared.inject.push(task);
774 self.shared.scheduler_metrics.inc_remote_schedule_count();
775 self.notify_parked();
776 })
777 }
778
schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool)779 fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
780 core.metrics.inc_local_schedule_count();
781
782 // Spawning from the worker thread. If scheduling a "yield" then the
783 // task must always be pushed to the back of the queue, enabling other
784 // tasks to be executed. If **not** a yield, then there is more
785 // flexibility and the task may go to the front of the queue.
786 let should_notify = if is_yield || self.shared.config.disable_lifo_slot {
787 core.run_queue
788 .push_back(task, &self.shared.inject, &mut core.metrics);
789 true
790 } else {
791 // Push to the LIFO slot
792 let prev = core.lifo_slot.take();
793 let ret = prev.is_some();
794
795 if let Some(prev) = prev {
796 core.run_queue
797 .push_back(prev, &self.shared.inject, &mut core.metrics);
798 }
799
800 core.lifo_slot = Some(task);
801
802 ret
803 };
804
805 // Only notify if not currently parked. If `park` is `None`, then the
806 // scheduling is from a resource driver. As notifications often come in
807 // batches, the notification is delayed until the park is complete.
808 if should_notify && core.park.is_some() {
809 self.notify_parked();
810 }
811 }
812
close(&self)813 pub(super) fn close(&self) {
814 if self.shared.inject.close() {
815 self.notify_all();
816 }
817 }
818
notify_parked(&self)819 fn notify_parked(&self) {
820 if let Some(index) = self.shared.idle.worker_to_notify() {
821 self.shared.remotes[index].unpark.unpark(&self.driver);
822 }
823 }
824
notify_all(&self)825 fn notify_all(&self) {
826 for remote in &self.shared.remotes[..] {
827 remote.unpark.unpark(&self.driver);
828 }
829 }
830
notify_if_work_pending(&self)831 fn notify_if_work_pending(&self) {
832 for remote in &self.shared.remotes[..] {
833 if !remote.steal.is_empty() {
834 self.notify_parked();
835 return;
836 }
837 }
838
839 if !self.shared.inject.is_empty() {
840 self.notify_parked();
841 }
842 }
843
transition_worker_from_searching(&self)844 fn transition_worker_from_searching(&self) {
845 if self.shared.idle.transition_worker_from_searching() {
846 // We are the final searching worker. Because work was found, we
847 // need to notify another worker.
848 self.notify_parked();
849 }
850 }
851
852 /// Signals that a worker has observed the shutdown signal and has replaced
853 /// its core back into its handle.
854 ///
855 /// If all workers have reached this point, the final cleanup is performed.
shutdown_core(&self, core: Box<Core>)856 fn shutdown_core(&self, core: Box<Core>) {
857 let mut cores = self.shared.shutdown_cores.lock();
858 cores.push(core);
859
860 if cores.len() != self.shared.remotes.len() {
861 return;
862 }
863
864 debug_assert!(self.shared.owned.is_empty());
865
866 for mut core in cores.drain(..) {
867 core.shutdown(self);
868 }
869
870 // Drain the injection queue
871 //
872 // We already shut down every task, so we can simply drop the tasks.
873 while let Some(task) = self.shared.inject.pop() {
874 drop(task);
875 }
876 }
877
ptr_eq(&self, other: &Handle) -> bool878 fn ptr_eq(&self, other: &Handle) -> bool {
879 std::ptr::eq(self, other)
880 }
881 }
882
did_defer_tasks() -> bool883 fn did_defer_tasks() -> bool {
884 context::with_defer(|deferred| !deferred.is_empty()).unwrap()
885 }
886
wake_deferred_tasks()887 fn wake_deferred_tasks() {
888 context::with_defer(|deferred| deferred.wake());
889 }
890
891 cfg_metrics! {
892 impl Shared {
893 pub(super) fn injection_queue_depth(&self) -> usize {
894 self.inject.len()
895 }
896
897 pub(super) fn worker_local_queue_depth(&self, worker: usize) -> usize {
898 self.remotes[worker].steal.len()
899 }
900 }
901 }
902