• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6 //!
7 //! # Shutdown
8 //!
9 //! Shutting down the runtime involves the following steps:
10 //!
11 //!  1. The Shared::close method is called. This closes the inject queue and
12 //!     OwnedTasks instance and wakes up all worker threads.
13 //!
14 //!  2. Each worker thread observes the close signal next time it runs
15 //!     Core::maintenance by checking whether the inject queue is closed.
16 //!     The Core::is_shutdown flag is set to true.
17 //!
18 //!  3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19 //!     will keep removing tasks from OwnedTasks until it is empty. No new
20 //!     tasks can be pushed to the OwnedTasks during or after this step as it
21 //!     was closed in step 1.
22 //!
23 //!  5. The workers call Shared::shutdown to enter the single-threaded phase of
24 //!     shutdown. These calls will push their core to Shared::shutdown_cores,
25 //!     and the last thread to push its core will finish the shutdown procedure.
26 //!
27 //!  6. The local run queue of each core is emptied, then the inject queue is
28 //!     emptied.
29 //!
30 //! At this point, shutdown has completed. It is not possible for any of the
31 //! collections to contain any tasks at this point, as each collection was
32 //! closed first, then emptied afterwards.
33 //!
34 //! ## Spawns during shutdown
35 //!
36 //! When spawning tasks during shutdown, there are two cases:
37 //!
38 //!  * The spawner observes the OwnedTasks being open, and the inject queue is
39 //!    closed.
40 //!  * The spawner observes the OwnedTasks being closed and doesn't check the
41 //!    inject queue.
42 //!
43 //! The first case can only happen if the OwnedTasks::bind call happens before
44 //! or during step 1 of shutdown. In this case, the runtime will clean up the
45 //! task in step 3 of shutdown.
46 //!
47 //! In the latter case, the task was not spawned and the task is immediately
48 //! cancelled by the spawner.
49 //!
50 //! The correctness of shutdown requires both the inject queue and OwnedTasks
51 //! collection to have a closed bit. With a close bit on only the inject queue,
52 //! spawning could run in to a situation where a task is successfully bound long
53 //! after the runtime has shut down. With a close bit on only the OwnedTasks,
54 //! the first spawning situation could result in the notification being pushed
55 //! to the inject queue after step 6 of shutdown, which would leave a task in
56 //! the inject queue indefinitely. This would be a ref-count cycle and a memory
57 //! leak.
58 
59 use crate::loom::sync::{Arc, Mutex};
60 use crate::runtime;
61 use crate::runtime::context;
62 use crate::runtime::scheduler::multi_thread::{queue, Handle, Idle, Parker, Unparker};
63 use crate::runtime::task::{Inject, OwnedTasks};
64 use crate::runtime::{
65     blocking, coop, driver, scheduler, task, Config, MetricsBatch, SchedulerMetrics, WorkerMetrics,
66 };
67 use crate::util::atomic_cell::AtomicCell;
68 use crate::util::rand::{FastRand, RngSeedGenerator};
69 
70 use std::cell::RefCell;
71 use std::time::Duration;
72 
73 /// A scheduler worker
74 pub(super) struct Worker {
75     /// Reference to scheduler's handle
76     handle: Arc<Handle>,
77 
78     /// Index holding this worker's remote state
79     index: usize,
80 
81     /// Used to hand-off a worker's core to another thread.
82     core: AtomicCell<Core>,
83 }
84 
85 /// Core data
86 struct Core {
87     /// Used to schedule bookkeeping tasks every so often.
88     tick: u32,
89 
90     /// When a task is scheduled from a worker, it is stored in this slot. The
91     /// worker will check this slot for a task **before** checking the run
92     /// queue. This effectively results in the **last** scheduled task to be run
93     /// next (LIFO). This is an optimization for message passing patterns and
94     /// helps to reduce latency.
95     lifo_slot: Option<Notified>,
96 
97     /// The worker-local run queue.
98     run_queue: queue::Local<Arc<Handle>>,
99 
100     /// True if the worker is currently searching for more work. Searching
101     /// involves attempting to steal from other workers.
102     is_searching: bool,
103 
104     /// True if the scheduler is being shutdown
105     is_shutdown: bool,
106 
107     /// Parker
108     ///
109     /// Stored in an `Option` as the parker is added / removed to make the
110     /// borrow checker happy.
111     park: Option<Parker>,
112 
113     /// Batching metrics so they can be submitted to RuntimeMetrics.
114     metrics: MetricsBatch,
115 
116     /// Fast random number generator.
117     rand: FastRand,
118 }
119 
120 /// State shared across all workers
121 pub(super) struct Shared {
122     /// Per-worker remote state. All other workers have access to this and is
123     /// how they communicate between each other.
124     remotes: Box<[Remote]>,
125 
126     /// Global task queue used for:
127     ///  1. Submit work to the scheduler while **not** currently on a worker thread.
128     ///  2. Submit work to the scheduler when a worker run queue is saturated
129     inject: Inject<Arc<Handle>>,
130 
131     /// Coordinates idle workers
132     idle: Idle,
133 
134     /// Collection of all active tasks spawned onto this executor.
135     pub(super) owned: OwnedTasks<Arc<Handle>>,
136 
137     /// Cores that have observed the shutdown signal
138     ///
139     /// The core is **not** placed back in the worker to avoid it from being
140     /// stolen by a thread that was spawned as part of `block_in_place`.
141     #[allow(clippy::vec_box)] // we're moving an already-boxed value
142     shutdown_cores: Mutex<Vec<Box<Core>>>,
143 
144     /// Scheduler configuration options
145     config: Config,
146 
147     /// Collects metrics from the runtime.
148     pub(super) scheduler_metrics: SchedulerMetrics,
149 
150     pub(super) worker_metrics: Box<[WorkerMetrics]>,
151 }
152 
153 /// Used to communicate with a worker from other threads.
154 struct Remote {
155     /// Steals tasks from this worker.
156     steal: queue::Steal<Arc<Handle>>,
157 
158     /// Unparks the associated worker thread
159     unpark: Unparker,
160 }
161 
162 /// Thread-local context
163 struct Context {
164     /// Worker
165     worker: Arc<Worker>,
166 
167     /// Core data
168     core: RefCell<Option<Box<Core>>>,
169 }
170 
171 /// Starts the workers
172 pub(crate) struct Launch(Vec<Arc<Worker>>);
173 
174 /// Running a task may consume the core. If the core is still available when
175 /// running the task completes, it is returned. Otherwise, the worker will need
176 /// to stop processing.
177 type RunResult = Result<Box<Core>, ()>;
178 
179 /// A task handle
180 type Task = task::Task<Arc<Handle>>;
181 
182 /// A notified task handle
183 type Notified = task::Notified<Arc<Handle>>;
184 
185 // Tracks thread-local state
186 scoped_thread_local!(static CURRENT: Context);
187 
create( size: usize, park: Parker, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config, ) -> (Arc<Handle>, Launch)188 pub(super) fn create(
189     size: usize,
190     park: Parker,
191     driver_handle: driver::Handle,
192     blocking_spawner: blocking::Spawner,
193     seed_generator: RngSeedGenerator,
194     config: Config,
195 ) -> (Arc<Handle>, Launch) {
196     let mut cores = Vec::with_capacity(size);
197     let mut remotes = Vec::with_capacity(size);
198     let mut worker_metrics = Vec::with_capacity(size);
199 
200     // Create the local queues
201     for _ in 0..size {
202         let (steal, run_queue) = queue::local();
203 
204         let park = park.clone();
205         let unpark = park.unpark();
206 
207         cores.push(Box::new(Core {
208             tick: 0,
209             lifo_slot: None,
210             run_queue,
211             is_searching: false,
212             is_shutdown: false,
213             park: Some(park),
214             metrics: MetricsBatch::new(),
215             rand: FastRand::new(config.seed_generator.next_seed()),
216         }));
217 
218         remotes.push(Remote { steal, unpark });
219         worker_metrics.push(WorkerMetrics::new());
220     }
221 
222     let handle = Arc::new(Handle {
223         shared: Shared {
224             remotes: remotes.into_boxed_slice(),
225             inject: Inject::new(),
226             idle: Idle::new(size),
227             owned: OwnedTasks::new(),
228             shutdown_cores: Mutex::new(vec![]),
229             config,
230             scheduler_metrics: SchedulerMetrics::new(),
231             worker_metrics: worker_metrics.into_boxed_slice(),
232         },
233         driver: driver_handle,
234         blocking_spawner,
235         seed_generator,
236     });
237 
238     let mut launch = Launch(vec![]);
239 
240     for (index, core) in cores.drain(..).enumerate() {
241         launch.0.push(Arc::new(Worker {
242             handle: handle.clone(),
243             index,
244             core: AtomicCell::new(Some(core)),
245         }));
246     }
247 
248     (handle, launch)
249 }
250 
251 #[track_caller]
block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R,252 pub(crate) fn block_in_place<F, R>(f: F) -> R
253 where
254     F: FnOnce() -> R,
255 {
256     // Try to steal the worker core back
257     struct Reset(coop::Budget);
258 
259     impl Drop for Reset {
260         fn drop(&mut self) {
261             CURRENT.with(|maybe_cx| {
262                 if let Some(cx) = maybe_cx {
263                     let core = cx.worker.core.take();
264                     let mut cx_core = cx.core.borrow_mut();
265                     assert!(cx_core.is_none());
266                     *cx_core = core;
267 
268                     // Reset the task budget as we are re-entering the
269                     // runtime.
270                     coop::set(self.0);
271                 }
272             });
273         }
274     }
275 
276     let mut had_entered = false;
277 
278     let setup_result = CURRENT.with(|maybe_cx| {
279         match (
280             crate::runtime::context::current_enter_context(),
281             maybe_cx.is_some(),
282         ) {
283             (context::EnterRuntime::Entered { .. }, true) => {
284                 // We are on a thread pool runtime thread, so we just need to
285                 // set up blocking.
286                 had_entered = true;
287             }
288             (
289                 context::EnterRuntime::Entered {
290                     allow_block_in_place,
291                 },
292                 false,
293             ) => {
294                 // We are on an executor, but _not_ on the thread pool.  That is
295                 // _only_ okay if we are in a thread pool runtime's block_on
296                 // method:
297                 if allow_block_in_place {
298                     had_entered = true;
299                     return Ok(());
300                 } else {
301                     // This probably means we are on the current_thread runtime or in a
302                     // LocalSet, where it is _not_ okay to block.
303                     return Err(
304                         "can call blocking only when running on the multi-threaded runtime",
305                     );
306                 }
307             }
308             (context::EnterRuntime::NotEntered, true) => {
309                 // This is a nested call to block_in_place (we already exited).
310                 // All the necessary setup has already been done.
311                 return Ok(());
312             }
313             (context::EnterRuntime::NotEntered, false) => {
314                 // We are outside of the tokio runtime, so blocking is fine.
315                 // We can also skip all of the thread pool blocking setup steps.
316                 return Ok(());
317             }
318         }
319 
320         let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
321 
322         // Get the worker core. If none is set, then blocking is fine!
323         let core = match cx.core.borrow_mut().take() {
324             Some(core) => core,
325             None => return Ok(()),
326         };
327 
328         // The parker should be set here
329         assert!(core.park.is_some());
330 
331         // In order to block, the core must be sent to another thread for
332         // execution.
333         //
334         // First, move the core back into the worker's shared core slot.
335         cx.worker.core.set(core);
336 
337         // Next, clone the worker handle and send it to a new thread for
338         // processing.
339         //
340         // Once the blocking task is done executing, we will attempt to
341         // steal the core back.
342         let worker = cx.worker.clone();
343         runtime::spawn_blocking(move || run(worker));
344         Ok(())
345     });
346 
347     if let Err(panic_message) = setup_result {
348         panic!("{}", panic_message);
349     }
350 
351     if had_entered {
352         // Unset the current task's budget. Blocking sections are not
353         // constrained by task budgets.
354         let _reset = Reset(coop::stop());
355 
356         crate::runtime::context::exit_runtime(f)
357     } else {
358         f()
359     }
360 }
361 
362 impl Launch {
launch(mut self)363     pub(crate) fn launch(mut self) {
364         for worker in self.0.drain(..) {
365             runtime::spawn_blocking(move || run(worker));
366         }
367     }
368 }
369 
run(worker: Arc<Worker>)370 fn run(worker: Arc<Worker>) {
371     struct AbortOnPanic;
372 
373     impl Drop for AbortOnPanic {
374         fn drop(&mut self) {
375             if std::thread::panicking() {
376                 eprintln!("worker thread panicking; aborting process");
377                 std::process::abort();
378             }
379         }
380     }
381 
382     // Catching panics on worker threads in tests is quite tricky. Instead, when
383     // debug assertions are enabled, we just abort the process.
384     #[cfg(debug_assertions)]
385     let _abort_on_panic = AbortOnPanic;
386 
387     // Acquire a core. If this fails, then another thread is running this
388     // worker and there is nothing further to do.
389     let core = match worker.core.take() {
390         Some(core) => core,
391         None => return,
392     };
393 
394     let handle = scheduler::Handle::MultiThread(worker.handle.clone());
395     let _enter = crate::runtime::context::enter_runtime(&handle, true);
396 
397     // Set the worker context.
398     let cx = Context {
399         worker,
400         core: RefCell::new(None),
401     };
402 
403     CURRENT.set(&cx, || {
404         // This should always be an error. It only returns a `Result` to support
405         // using `?` to short circuit.
406         assert!(cx.run(core).is_err());
407 
408         // Check if there are any deferred tasks to notify. This can happen when
409         // the worker core is lost due to `block_in_place()` being called from
410         // within the task.
411         wake_deferred_tasks();
412     });
413 }
414 
415 impl Context {
run(&self, mut core: Box<Core>) -> RunResult416     fn run(&self, mut core: Box<Core>) -> RunResult {
417         while !core.is_shutdown {
418             // Increment the tick
419             core.tick();
420 
421             // Run maintenance, if needed
422             core = self.maintenance(core);
423 
424             // First, check work available to the current worker.
425             if let Some(task) = core.next_task(&self.worker) {
426                 core = self.run_task(task, core)?;
427                 continue;
428             }
429 
430             // There is no more **local** work to process, try to steal work
431             // from other workers.
432             if let Some(task) = core.steal_work(&self.worker) {
433                 core = self.run_task(task, core)?;
434             } else {
435                 // Wait for work
436                 core = if did_defer_tasks() {
437                     self.park_timeout(core, Some(Duration::from_millis(0)))
438                 } else {
439                     self.park(core)
440                 };
441             }
442         }
443 
444         core.pre_shutdown(&self.worker);
445 
446         // Signal shutdown
447         self.worker.handle.shutdown_core(core);
448         Err(())
449     }
450 
run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult451     fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
452         let task = self.worker.handle.shared.owned.assert_owner(task);
453 
454         // Make sure the worker is not in the **searching** state. This enables
455         // another idle worker to try to steal work.
456         core.transition_from_searching(&self.worker);
457 
458         // Make the core available to the runtime context
459         core.metrics.incr_poll_count();
460         *self.core.borrow_mut() = Some(core);
461 
462         // Run the task
463         coop::budget(|| {
464             task.run();
465 
466             // As long as there is budget remaining and a task exists in the
467             // `lifo_slot`, then keep running.
468             loop {
469                 // Check if we still have the core. If not, the core was stolen
470                 // by another worker.
471                 let mut core = match self.core.borrow_mut().take() {
472                     Some(core) => core,
473                     None => return Err(()),
474                 };
475 
476                 // Check for a task in the LIFO slot
477                 let task = match core.lifo_slot.take() {
478                     Some(task) => task,
479                     None => return Ok(core),
480                 };
481 
482                 if coop::has_budget_remaining() {
483                     // Run the LIFO task, then loop
484                     core.metrics.incr_poll_count();
485                     *self.core.borrow_mut() = Some(core);
486                     let task = self.worker.handle.shared.owned.assert_owner(task);
487                     task.run();
488                 } else {
489                     // Not enough budget left to run the LIFO task, push it to
490                     // the back of the queue and return.
491                     core.run_queue
492                         .push_back(task, self.worker.inject(), &mut core.metrics);
493                     return Ok(core);
494                 }
495             }
496         })
497     }
498 
maintenance(&self, mut core: Box<Core>) -> Box<Core>499     fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
500         if core.tick % self.worker.handle.shared.config.event_interval == 0 {
501             // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
502             // to run without actually putting the thread to sleep.
503             core = self.park_timeout(core, Some(Duration::from_millis(0)));
504 
505             // Run regularly scheduled maintenance
506             core.maintenance(&self.worker);
507         }
508 
509         core
510     }
511 
512     /// Parks the worker thread while waiting for tasks to execute.
513     ///
514     /// This function checks if indeed there's no more work left to be done before parking.
515     /// Also important to notice that, before parking, the worker thread will try to take
516     /// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
517     /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
518     /// in its own local queue until the queue saturates (ntasks > LOCAL_QUEUE_CAPACITY).
519     /// When the local queue is saturated, the overflow tasks are added to the injection queue
520     /// from where other workers can pick them up.
521     /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
522     /// after all the IOs get dispatched
park(&self, mut core: Box<Core>) -> Box<Core>523     fn park(&self, mut core: Box<Core>) -> Box<Core> {
524         if let Some(f) = &self.worker.handle.shared.config.before_park {
525             f();
526         }
527 
528         if core.transition_to_parked(&self.worker) {
529             while !core.is_shutdown {
530                 core.metrics.about_to_park();
531                 core = self.park_timeout(core, None);
532                 core.metrics.returned_from_park();
533 
534                 // Run regularly scheduled maintenance
535                 core.maintenance(&self.worker);
536 
537                 if core.transition_from_parked(&self.worker) {
538                     break;
539                 }
540             }
541         }
542 
543         if let Some(f) = &self.worker.handle.shared.config.after_unpark {
544             f();
545         }
546         core
547     }
548 
park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core>549     fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
550         // Take the parker out of core
551         let mut park = core.park.take().expect("park missing");
552 
553         // Store `core` in context
554         *self.core.borrow_mut() = Some(core);
555 
556         // Park thread
557         if let Some(timeout) = duration {
558             park.park_timeout(&self.worker.handle.driver, timeout);
559         } else {
560             park.park(&self.worker.handle.driver);
561         }
562 
563         wake_deferred_tasks();
564 
565         // Remove `core` from context
566         core = self.core.borrow_mut().take().expect("core missing");
567 
568         // Place `park` back in `core`
569         core.park = Some(park);
570 
571         // If there are tasks available to steal, but this worker is not
572         // looking for tasks to steal, notify another worker.
573         if !core.is_searching && core.run_queue.is_stealable() {
574             self.worker.handle.notify_parked();
575         }
576 
577         core
578     }
579 }
580 
581 impl Core {
582     /// Increment the tick
tick(&mut self)583     fn tick(&mut self) {
584         self.tick = self.tick.wrapping_add(1);
585     }
586 
587     /// Return the next notified task available to this worker.
next_task(&mut self, worker: &Worker) -> Option<Notified>588     fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
589         if self.tick % worker.handle.shared.config.global_queue_interval == 0 {
590             worker.inject().pop().or_else(|| self.next_local_task())
591         } else {
592             self.next_local_task().or_else(|| worker.inject().pop())
593         }
594     }
595 
next_local_task(&mut self) -> Option<Notified>596     fn next_local_task(&mut self) -> Option<Notified> {
597         self.lifo_slot.take().or_else(|| self.run_queue.pop())
598     }
599 
600     /// Function responsible for stealing tasks from another worker
601     ///
602     /// Note: Only if less than half the workers are searching for tasks to steal
603     /// a new worker will actually try to steal. The idea is to make sure not all
604     /// workers will be trying to steal at the same time.
steal_work(&mut self, worker: &Worker) -> Option<Notified>605     fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
606         if !self.transition_to_searching(worker) {
607             return None;
608         }
609 
610         let num = worker.handle.shared.remotes.len();
611         // Start from a random worker
612         let start = self.rand.fastrand_n(num as u32) as usize;
613 
614         for i in 0..num {
615             let i = (start + i) % num;
616 
617             // Don't steal from ourself! We know we don't have work.
618             if i == worker.index {
619                 continue;
620             }
621 
622             let target = &worker.handle.shared.remotes[i];
623             if let Some(task) = target
624                 .steal
625                 .steal_into(&mut self.run_queue, &mut self.metrics)
626             {
627                 return Some(task);
628             }
629         }
630 
631         // Fallback on checking the global queue
632         worker.handle.shared.inject.pop()
633     }
634 
transition_to_searching(&mut self, worker: &Worker) -> bool635     fn transition_to_searching(&mut self, worker: &Worker) -> bool {
636         if !self.is_searching {
637             self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
638         }
639 
640         self.is_searching
641     }
642 
transition_from_searching(&mut self, worker: &Worker)643     fn transition_from_searching(&mut self, worker: &Worker) {
644         if !self.is_searching {
645             return;
646         }
647 
648         self.is_searching = false;
649         worker.handle.transition_worker_from_searching();
650     }
651 
652     /// Prepares the worker state for parking.
653     ///
654     /// Returns true if the transition happened, false if there is work to do first.
transition_to_parked(&mut self, worker: &Worker) -> bool655     fn transition_to_parked(&mut self, worker: &Worker) -> bool {
656         // Workers should not park if they have work to do
657         if self.lifo_slot.is_some() || self.run_queue.has_tasks() {
658             return false;
659         }
660 
661         // When the final worker transitions **out** of searching to parked, it
662         // must check all the queues one last time in case work materialized
663         // between the last work scan and transitioning out of searching.
664         let is_last_searcher = worker
665             .handle
666             .shared
667             .idle
668             .transition_worker_to_parked(worker.index, self.is_searching);
669 
670         // The worker is no longer searching. Setting this is the local cache
671         // only.
672         self.is_searching = false;
673 
674         if is_last_searcher {
675             worker.handle.notify_if_work_pending();
676         }
677 
678         true
679     }
680 
681     /// Returns `true` if the transition happened.
transition_from_parked(&mut self, worker: &Worker) -> bool682     fn transition_from_parked(&mut self, worker: &Worker) -> bool {
683         // If a task is in the lifo slot, then we must unpark regardless of
684         // being notified
685         if self.lifo_slot.is_some() {
686             // When a worker wakes, it should only transition to the "searching"
687             // state when the wake originates from another worker *or* a new task
688             // is pushed. We do *not* want the worker to transition to "searching"
689             // when it wakes when the I/O driver receives new events.
690             self.is_searching = !worker.handle.shared.idle.unpark_worker_by_id(worker.index);
691             return true;
692         }
693 
694         if worker.handle.shared.idle.is_parked(worker.index) {
695             return false;
696         }
697 
698         // When unparked, the worker is in the searching state.
699         self.is_searching = true;
700         true
701     }
702 
703     /// Runs maintenance work such as checking the pool's state.
maintenance(&mut self, worker: &Worker)704     fn maintenance(&mut self, worker: &Worker) {
705         self.metrics
706             .submit(&worker.handle.shared.worker_metrics[worker.index]);
707 
708         if !self.is_shutdown {
709             // Check if the scheduler has been shutdown
710             self.is_shutdown = worker.inject().is_closed();
711         }
712     }
713 
714     /// Signals all tasks to shut down, and waits for them to complete. Must run
715     /// before we enter the single-threaded phase of shutdown processing.
pre_shutdown(&mut self, worker: &Worker)716     fn pre_shutdown(&mut self, worker: &Worker) {
717         // Signal to all tasks to shut down.
718         worker.handle.shared.owned.close_and_shutdown_all();
719 
720         self.metrics
721             .submit(&worker.handle.shared.worker_metrics[worker.index]);
722     }
723 
724     /// Shuts down the core.
shutdown(&mut self, handle: &Handle)725     fn shutdown(&mut self, handle: &Handle) {
726         // Take the core
727         let mut park = self.park.take().expect("park missing");
728 
729         // Drain the queue
730         while self.next_local_task().is_some() {}
731 
732         park.shutdown(&handle.driver);
733     }
734 }
735 
736 impl Worker {
737     /// Returns a reference to the scheduler's injection queue.
inject(&self) -> &Inject<Arc<Handle>>738     fn inject(&self) -> &Inject<Arc<Handle>> {
739         &self.handle.shared.inject
740     }
741 }
742 
743 // TODO: Move `Handle` impls into handle.rs
744 impl task::Schedule for Arc<Handle> {
release(&self, task: &Task) -> Option<Task>745     fn release(&self, task: &Task) -> Option<Task> {
746         self.shared.owned.remove(task)
747     }
748 
schedule(&self, task: Notified)749     fn schedule(&self, task: Notified) {
750         self.schedule_task(task, false);
751     }
752 
yield_now(&self, task: Notified)753     fn yield_now(&self, task: Notified) {
754         self.schedule_task(task, true);
755     }
756 }
757 
758 impl Handle {
schedule_task(&self, task: Notified, is_yield: bool)759     pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
760         CURRENT.with(|maybe_cx| {
761             if let Some(cx) = maybe_cx {
762                 // Make sure the task is part of the **current** scheduler.
763                 if self.ptr_eq(&cx.worker.handle) {
764                     // And the current thread still holds a core
765                     if let Some(core) = cx.core.borrow_mut().as_mut() {
766                         self.schedule_local(core, task, is_yield);
767                         return;
768                     }
769                 }
770             }
771 
772             // Otherwise, use the inject queue.
773             self.shared.inject.push(task);
774             self.shared.scheduler_metrics.inc_remote_schedule_count();
775             self.notify_parked();
776         })
777     }
778 
schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool)779     fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
780         core.metrics.inc_local_schedule_count();
781 
782         // Spawning from the worker thread. If scheduling a "yield" then the
783         // task must always be pushed to the back of the queue, enabling other
784         // tasks to be executed. If **not** a yield, then there is more
785         // flexibility and the task may go to the front of the queue.
786         let should_notify = if is_yield || self.shared.config.disable_lifo_slot {
787             core.run_queue
788                 .push_back(task, &self.shared.inject, &mut core.metrics);
789             true
790         } else {
791             // Push to the LIFO slot
792             let prev = core.lifo_slot.take();
793             let ret = prev.is_some();
794 
795             if let Some(prev) = prev {
796                 core.run_queue
797                     .push_back(prev, &self.shared.inject, &mut core.metrics);
798             }
799 
800             core.lifo_slot = Some(task);
801 
802             ret
803         };
804 
805         // Only notify if not currently parked. If `park` is `None`, then the
806         // scheduling is from a resource driver. As notifications often come in
807         // batches, the notification is delayed until the park is complete.
808         if should_notify && core.park.is_some() {
809             self.notify_parked();
810         }
811     }
812 
close(&self)813     pub(super) fn close(&self) {
814         if self.shared.inject.close() {
815             self.notify_all();
816         }
817     }
818 
notify_parked(&self)819     fn notify_parked(&self) {
820         if let Some(index) = self.shared.idle.worker_to_notify() {
821             self.shared.remotes[index].unpark.unpark(&self.driver);
822         }
823     }
824 
notify_all(&self)825     fn notify_all(&self) {
826         for remote in &self.shared.remotes[..] {
827             remote.unpark.unpark(&self.driver);
828         }
829     }
830 
notify_if_work_pending(&self)831     fn notify_if_work_pending(&self) {
832         for remote in &self.shared.remotes[..] {
833             if !remote.steal.is_empty() {
834                 self.notify_parked();
835                 return;
836             }
837         }
838 
839         if !self.shared.inject.is_empty() {
840             self.notify_parked();
841         }
842     }
843 
transition_worker_from_searching(&self)844     fn transition_worker_from_searching(&self) {
845         if self.shared.idle.transition_worker_from_searching() {
846             // We are the final searching worker. Because work was found, we
847             // need to notify another worker.
848             self.notify_parked();
849         }
850     }
851 
852     /// Signals that a worker has observed the shutdown signal and has replaced
853     /// its core back into its handle.
854     ///
855     /// If all workers have reached this point, the final cleanup is performed.
shutdown_core(&self, core: Box<Core>)856     fn shutdown_core(&self, core: Box<Core>) {
857         let mut cores = self.shared.shutdown_cores.lock();
858         cores.push(core);
859 
860         if cores.len() != self.shared.remotes.len() {
861             return;
862         }
863 
864         debug_assert!(self.shared.owned.is_empty());
865 
866         for mut core in cores.drain(..) {
867             core.shutdown(self);
868         }
869 
870         // Drain the injection queue
871         //
872         // We already shut down every task, so we can simply drop the tasks.
873         while let Some(task) = self.shared.inject.pop() {
874             drop(task);
875         }
876     }
877 
ptr_eq(&self, other: &Handle) -> bool878     fn ptr_eq(&self, other: &Handle) -> bool {
879         std::ptr::eq(self, other)
880     }
881 }
882 
did_defer_tasks() -> bool883 fn did_defer_tasks() -> bool {
884     context::with_defer(|deferred| !deferred.is_empty()).unwrap()
885 }
886 
wake_deferred_tasks()887 fn wake_deferred_tasks() {
888     context::with_defer(|deferred| deferred.wake());
889 }
890 
891 cfg_metrics! {
892     impl Shared {
893         pub(super) fn injection_queue_depth(&self) -> usize {
894             self.inject.len()
895         }
896 
897         pub(super) fn worker_local_queue_depth(&self, worker: usize) -> usize {
898             self.remotes[worker].steal.len()
899         }
900     }
901 }
902