1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6 //!
7 //! # Shutdown
8 //!
9 //! Shutting down the runtime involves the following steps:
10 //!
11 //! 1. The Shared::close method is called. This closes the inject queue and
12 //! OwnedTasks instance and wakes up all worker threads.
13 //!
14 //! 2. Each worker thread observes the close signal next time it runs
15 //! Core::maintenance by checking whether the inject queue is closed.
16 //! The Core::is_shutdown flag is set to true.
17 //!
18 //! 3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19 //! will keep removing tasks from OwnedTasks until it is empty. No new
20 //! tasks can be pushed to the OwnedTasks during or after this step as it
21 //! was closed in step 1.
22 //!
23 //! 5. The workers call Shared::shutdown to enter the single-threaded phase of
24 //! shutdown. These calls will push their core to Shared::shutdown_cores,
25 //! and the last thread to push its core will finish the shutdown procedure.
26 //!
27 //! 6. The local run queue of each core is emptied, then the inject queue is
28 //! emptied.
29 //!
30 //! At this point, shutdown has completed. It is not possible for any of the
31 //! collections to contain any tasks at this point, as each collection was
32 //! closed first, then emptied afterwards.
33 //!
34 //! ## Spawns during shutdown
35 //!
36 //! When spawning tasks during shutdown, there are two cases:
37 //!
38 //! * The spawner observes the OwnedTasks being open, and the inject queue is
39 //! closed.
40 //! * The spawner observes the OwnedTasks being closed and doesn't check the
41 //! inject queue.
42 //!
43 //! The first case can only happen if the OwnedTasks::bind call happens before
44 //! or during step 1 of shutdown. In this case, the runtime will clean up the
45 //! task in step 3 of shutdown.
46 //!
47 //! In the latter case, the task was not spawned and the task is immediately
48 //! cancelled by the spawner.
49 //!
50 //! The correctness of shutdown requires both the inject queue and OwnedTasks
51 //! collection to have a closed bit. With a close bit on only the inject queue,
52 //! spawning could run in to a situation where a task is successfully bound long
53 //! after the runtime has shut down. With a close bit on only the OwnedTasks,
54 //! the first spawning situation could result in the notification being pushed
55 //! to the inject queue after step 6 of shutdown, which would leave a task in
56 //! the inject queue indefinitely. This would be a ref-count cycle and a memory
57 //! leak.
58
59 use crate::loom::sync::{Arc, Mutex};
60 use crate::runtime;
61 use crate::runtime::context;
62 use crate::runtime::scheduler::multi_thread::{
63 idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
64 };
65 use crate::runtime::scheduler::{inject, Defer, Lock};
66 use crate::runtime::task::OwnedTasks;
67 use crate::runtime::{
68 blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics,
69 };
70 use crate::util::atomic_cell::AtomicCell;
71 use crate::util::rand::{FastRand, RngSeedGenerator};
72
73 use std::cell::RefCell;
74 use std::task::Waker;
75 use std::time::Duration;
76
77 cfg_metrics! {
78 mod metrics;
79 }
80
81 cfg_taskdump! {
82 mod taskdump;
83 }
84
85 cfg_not_taskdump! {
86 mod taskdump_mock;
87 }
88
89 /// A scheduler worker
90 pub(super) struct Worker {
91 /// Reference to scheduler's handle
92 handle: Arc<Handle>,
93
94 /// Index holding this worker's remote state
95 index: usize,
96
97 /// Used to hand-off a worker's core to another thread.
98 core: AtomicCell<Core>,
99 }
100
101 /// Core data
102 struct Core {
103 /// Used to schedule bookkeeping tasks every so often.
104 tick: u32,
105
106 /// When a task is scheduled from a worker, it is stored in this slot. The
107 /// worker will check this slot for a task **before** checking the run
108 /// queue. This effectively results in the **last** scheduled task to be run
109 /// next (LIFO). This is an optimization for improving locality which
110 /// benefits message passing patterns and helps to reduce latency.
111 lifo_slot: Option<Notified>,
112
113 /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
114 /// they go to the back of the `run_queue`.
115 lifo_enabled: bool,
116
117 /// The worker-local run queue.
118 run_queue: queue::Local<Arc<Handle>>,
119
120 /// True if the worker is currently searching for more work. Searching
121 /// involves attempting to steal from other workers.
122 is_searching: bool,
123
124 /// True if the scheduler is being shutdown
125 is_shutdown: bool,
126
127 /// True if the scheduler is being traced
128 is_traced: bool,
129
130 /// Parker
131 ///
132 /// Stored in an `Option` as the parker is added / removed to make the
133 /// borrow checker happy.
134 park: Option<Parker>,
135
136 /// Per-worker runtime stats
137 stats: Stats,
138
139 /// How often to check the global queue
140 global_queue_interval: u32,
141
142 /// Fast random number generator.
143 rand: FastRand,
144 }
145
146 /// State shared across all workers
147 pub(crate) struct Shared {
148 /// Per-worker remote state. All other workers have access to this and is
149 /// how they communicate between each other.
150 remotes: Box<[Remote]>,
151
152 /// Global task queue used for:
153 /// 1. Submit work to the scheduler while **not** currently on a worker thread.
154 /// 2. Submit work to the scheduler when a worker run queue is saturated
155 pub(super) inject: inject::Shared<Arc<Handle>>,
156
157 /// Coordinates idle workers
158 idle: Idle,
159
160 /// Collection of all active tasks spawned onto this executor.
161 pub(crate) owned: OwnedTasks<Arc<Handle>>,
162
163 /// Data synchronized by the scheduler mutex
164 pub(super) synced: Mutex<Synced>,
165
166 /// Cores that have observed the shutdown signal
167 ///
168 /// The core is **not** placed back in the worker to avoid it from being
169 /// stolen by a thread that was spawned as part of `block_in_place`.
170 #[allow(clippy::vec_box)] // we're moving an already-boxed value
171 shutdown_cores: Mutex<Vec<Box<Core>>>,
172
173 /// The number of cores that have observed the trace signal.
174 pub(super) trace_status: TraceStatus,
175
176 /// Scheduler configuration options
177 config: Config,
178
179 /// Collects metrics from the runtime.
180 pub(super) scheduler_metrics: SchedulerMetrics,
181
182 pub(super) worker_metrics: Box<[WorkerMetrics]>,
183
184 /// Only held to trigger some code on drop. This is used to get internal
185 /// runtime metrics that can be useful when doing performance
186 /// investigations. This does nothing (empty struct, no drop impl) unless
187 /// the `tokio_internal_mt_counters` cfg flag is set.
188 _counters: Counters,
189 }
190
191 /// Data synchronized by the scheduler mutex
192 pub(crate) struct Synced {
193 /// Synchronized state for `Idle`.
194 pub(super) idle: idle::Synced,
195
196 /// Synchronized state for `Inject`.
197 pub(crate) inject: inject::Synced,
198 }
199
200 /// Used to communicate with a worker from other threads.
201 struct Remote {
202 /// Steals tasks from this worker.
203 pub(super) steal: queue::Steal<Arc<Handle>>,
204
205 /// Unparks the associated worker thread
206 unpark: Unparker,
207 }
208
209 /// Thread-local context
210 pub(crate) struct Context {
211 /// Worker
212 worker: Arc<Worker>,
213
214 /// Core data
215 core: RefCell<Option<Box<Core>>>,
216
217 /// Tasks to wake after resource drivers are polled. This is mostly to
218 /// handle yielded tasks.
219 pub(crate) defer: Defer,
220 }
221
222 /// Starts the workers
223 pub(crate) struct Launch(Vec<Arc<Worker>>);
224
225 /// Running a task may consume the core. If the core is still available when
226 /// running the task completes, it is returned. Otherwise, the worker will need
227 /// to stop processing.
228 type RunResult = Result<Box<Core>, ()>;
229
230 /// A task handle
231 type Task = task::Task<Arc<Handle>>;
232
233 /// A notified task handle
234 type Notified = task::Notified<Arc<Handle>>;
235
236 /// Value picked out of thin-air. Running the LIFO slot a handful of times
237 /// seemms sufficient to benefit from locality. More than 3 times probably is
238 /// overweighing. The value can be tuned in the future with data that shows
239 /// improvements.
240 const MAX_LIFO_POLLS_PER_TICK: usize = 3;
241
create( size: usize, park: Parker, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config, ) -> (Arc<Handle>, Launch)242 pub(super) fn create(
243 size: usize,
244 park: Parker,
245 driver_handle: driver::Handle,
246 blocking_spawner: blocking::Spawner,
247 seed_generator: RngSeedGenerator,
248 config: Config,
249 ) -> (Arc<Handle>, Launch) {
250 let mut cores = Vec::with_capacity(size);
251 let mut remotes = Vec::with_capacity(size);
252 let mut worker_metrics = Vec::with_capacity(size);
253
254 // Create the local queues
255 for _ in 0..size {
256 let (steal, run_queue) = queue::local();
257
258 let park = park.clone();
259 let unpark = park.unpark();
260 let metrics = WorkerMetrics::from_config(&config);
261 let stats = Stats::new(&metrics);
262
263 cores.push(Box::new(Core {
264 tick: 0,
265 lifo_slot: None,
266 lifo_enabled: !config.disable_lifo_slot,
267 run_queue,
268 is_searching: false,
269 is_shutdown: false,
270 is_traced: false,
271 park: Some(park),
272 global_queue_interval: stats.tuned_global_queue_interval(&config),
273 stats,
274 rand: FastRand::from_seed(config.seed_generator.next_seed()),
275 }));
276
277 remotes.push(Remote { steal, unpark });
278 worker_metrics.push(metrics);
279 }
280
281 let (idle, idle_synced) = Idle::new(size);
282 let (inject, inject_synced) = inject::Shared::new();
283
284 let remotes_len = remotes.len();
285 let handle = Arc::new(Handle {
286 shared: Shared {
287 remotes: remotes.into_boxed_slice(),
288 inject,
289 idle,
290 owned: OwnedTasks::new(),
291 synced: Mutex::new(Synced {
292 idle: idle_synced,
293 inject: inject_synced,
294 }),
295 shutdown_cores: Mutex::new(vec![]),
296 trace_status: TraceStatus::new(remotes_len),
297 config,
298 scheduler_metrics: SchedulerMetrics::new(),
299 worker_metrics: worker_metrics.into_boxed_slice(),
300 _counters: Counters,
301 },
302 driver: driver_handle,
303 blocking_spawner,
304 seed_generator,
305 });
306
307 let mut launch = Launch(vec![]);
308
309 for (index, core) in cores.drain(..).enumerate() {
310 launch.0.push(Arc::new(Worker {
311 handle: handle.clone(),
312 index,
313 core: AtomicCell::new(Some(core)),
314 }));
315 }
316
317 (handle, launch)
318 }
319
320 #[track_caller]
block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R,321 pub(crate) fn block_in_place<F, R>(f: F) -> R
322 where
323 F: FnOnce() -> R,
324 {
325 // Try to steal the worker core back
326 struct Reset {
327 take_core: bool,
328 budget: coop::Budget,
329 }
330
331 impl Drop for Reset {
332 fn drop(&mut self) {
333 with_current(|maybe_cx| {
334 if let Some(cx) = maybe_cx {
335 if self.take_core {
336 let core = cx.worker.core.take();
337 let mut cx_core = cx.core.borrow_mut();
338 assert!(cx_core.is_none());
339 *cx_core = core;
340 }
341
342 // Reset the task budget as we are re-entering the
343 // runtime.
344 coop::set(self.budget);
345 }
346 });
347 }
348 }
349
350 let mut had_entered = false;
351 let mut take_core = false;
352
353 let setup_result = with_current(|maybe_cx| {
354 match (
355 crate::runtime::context::current_enter_context(),
356 maybe_cx.is_some(),
357 ) {
358 (context::EnterRuntime::Entered { .. }, true) => {
359 // We are on a thread pool runtime thread, so we just need to
360 // set up blocking.
361 had_entered = true;
362 }
363 (
364 context::EnterRuntime::Entered {
365 allow_block_in_place,
366 },
367 false,
368 ) => {
369 // We are on an executor, but _not_ on the thread pool. That is
370 // _only_ okay if we are in a thread pool runtime's block_on
371 // method:
372 if allow_block_in_place {
373 had_entered = true;
374 return Ok(());
375 } else {
376 // This probably means we are on the current_thread runtime or in a
377 // LocalSet, where it is _not_ okay to block.
378 return Err(
379 "can call blocking only when running on the multi-threaded runtime",
380 );
381 }
382 }
383 (context::EnterRuntime::NotEntered, true) => {
384 // This is a nested call to block_in_place (we already exited).
385 // All the necessary setup has already been done.
386 return Ok(());
387 }
388 (context::EnterRuntime::NotEntered, false) => {
389 // We are outside of the tokio runtime, so blocking is fine.
390 // We can also skip all of the thread pool blocking setup steps.
391 return Ok(());
392 }
393 }
394
395 let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
396
397 // Get the worker core. If none is set, then blocking is fine!
398 let core = match cx.core.borrow_mut().take() {
399 Some(core) => core,
400 None => return Ok(()),
401 };
402
403 // We are taking the core from the context and sending it to another
404 // thread.
405 take_core = true;
406
407 // The parker should be set here
408 assert!(core.park.is_some());
409
410 // In order to block, the core must be sent to another thread for
411 // execution.
412 //
413 // First, move the core back into the worker's shared core slot.
414 cx.worker.core.set(core);
415
416 // Next, clone the worker handle and send it to a new thread for
417 // processing.
418 //
419 // Once the blocking task is done executing, we will attempt to
420 // steal the core back.
421 let worker = cx.worker.clone();
422 runtime::spawn_blocking(move || run(worker));
423 Ok(())
424 });
425
426 if let Err(panic_message) = setup_result {
427 panic!("{}", panic_message);
428 }
429
430 if had_entered {
431 // Unset the current task's budget. Blocking sections are not
432 // constrained by task budgets.
433 let _reset = Reset {
434 take_core,
435 budget: coop::stop(),
436 };
437
438 crate::runtime::context::exit_runtime(f)
439 } else {
440 f()
441 }
442 }
443
444 impl Launch {
launch(mut self)445 pub(crate) fn launch(mut self) {
446 for worker in self.0.drain(..) {
447 runtime::spawn_blocking(move || run(worker));
448 }
449 }
450 }
451
run(worker: Arc<Worker>)452 fn run(worker: Arc<Worker>) {
453 struct AbortOnPanic;
454
455 impl Drop for AbortOnPanic {
456 fn drop(&mut self) {
457 if std::thread::panicking() {
458 eprintln!("worker thread panicking; aborting process");
459 std::process::abort();
460 }
461 }
462 }
463
464 // Catching panics on worker threads in tests is quite tricky. Instead, when
465 // debug assertions are enabled, we just abort the process.
466 #[cfg(debug_assertions)]
467 let _abort_on_panic = AbortOnPanic;
468
469 // Acquire a core. If this fails, then another thread is running this
470 // worker and there is nothing further to do.
471 let core = match worker.core.take() {
472 Some(core) => core,
473 None => return,
474 };
475
476 let handle = scheduler::Handle::MultiThread(worker.handle.clone());
477
478 crate::runtime::context::enter_runtime(&handle, true, |_| {
479 // Set the worker context.
480 let cx = scheduler::Context::MultiThread(Context {
481 worker,
482 core: RefCell::new(None),
483 defer: Defer::new(),
484 });
485
486 context::set_scheduler(&cx, || {
487 let cx = cx.expect_multi_thread();
488
489 // This should always be an error. It only returns a `Result` to support
490 // using `?` to short circuit.
491 assert!(cx.run(core).is_err());
492
493 // Check if there are any deferred tasks to notify. This can happen when
494 // the worker core is lost due to `block_in_place()` being called from
495 // within the task.
496 cx.defer.wake();
497 });
498 });
499 }
500
501 impl Context {
run(&self, mut core: Box<Core>) -> RunResult502 fn run(&self, mut core: Box<Core>) -> RunResult {
503 // Reset `lifo_enabled` here in case the core was previously stolen from
504 // a task that had the LIFO slot disabled.
505 self.reset_lifo_enabled(&mut core);
506
507 // Start as "processing" tasks as polling tasks from the local queue
508 // will be one of the first things we do.
509 core.stats.start_processing_scheduled_tasks();
510
511 while !core.is_shutdown {
512 self.assert_lifo_enabled_is_correct(&core);
513
514 if core.is_traced {
515 core = self.worker.handle.trace_core(core);
516 }
517
518 // Increment the tick
519 core.tick();
520
521 // Run maintenance, if needed
522 core = self.maintenance(core);
523
524 // First, check work available to the current worker.
525 if let Some(task) = core.next_task(&self.worker) {
526 core = self.run_task(task, core)?;
527 continue;
528 }
529
530 // We consumed all work in the queues and will start searching for work.
531 core.stats.end_processing_scheduled_tasks();
532
533 // There is no more **local** work to process, try to steal work
534 // from other workers.
535 if let Some(task) = core.steal_work(&self.worker) {
536 // Found work, switch back to processing
537 core.stats.start_processing_scheduled_tasks();
538 core = self.run_task(task, core)?;
539 } else {
540 // Wait for work
541 core = if !self.defer.is_empty() {
542 self.park_timeout(core, Some(Duration::from_millis(0)))
543 } else {
544 self.park(core)
545 };
546 }
547 }
548
549 core.pre_shutdown(&self.worker);
550
551 // Signal shutdown
552 self.worker.handle.shutdown_core(core);
553 Err(())
554 }
555
run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult556 fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
557 let task = self.worker.handle.shared.owned.assert_owner(task);
558
559 // Make sure the worker is not in the **searching** state. This enables
560 // another idle worker to try to steal work.
561 core.transition_from_searching(&self.worker);
562
563 self.assert_lifo_enabled_is_correct(&core);
564
565 // Measure the poll start time. Note that we may end up polling other
566 // tasks under this measurement. In this case, the tasks came from the
567 // LIFO slot and are considered part of the current task for scheduling
568 // purposes. These tasks inherent the "parent"'s limits.
569 core.stats.start_poll();
570
571 // Make the core available to the runtime context
572 *self.core.borrow_mut() = Some(core);
573
574 // Run the task
575 coop::budget(|| {
576 task.run();
577 let mut lifo_polls = 0;
578
579 // As long as there is budget remaining and a task exists in the
580 // `lifo_slot`, then keep running.
581 loop {
582 // Check if we still have the core. If not, the core was stolen
583 // by another worker.
584 let mut core = match self.core.borrow_mut().take() {
585 Some(core) => core,
586 None => {
587 // In this case, we cannot call `reset_lifo_enabled()`
588 // because the core was stolen. The stealer will handle
589 // that at the top of `Context::run`
590 return Err(());
591 }
592 };
593
594 // Check for a task in the LIFO slot
595 let task = match core.lifo_slot.take() {
596 Some(task) => task,
597 None => {
598 self.reset_lifo_enabled(&mut core);
599 core.stats.end_poll();
600 return Ok(core);
601 }
602 };
603
604 if !coop::has_budget_remaining() {
605 core.stats.end_poll();
606
607 // Not enough budget left to run the LIFO task, push it to
608 // the back of the queue and return.
609 core.run_queue.push_back_or_overflow(
610 task,
611 &*self.worker.handle,
612 &mut core.stats,
613 );
614 // If we hit this point, the LIFO slot should be enabled.
615 // There is no need to reset it.
616 debug_assert!(core.lifo_enabled);
617 return Ok(core);
618 }
619
620 // Track that we are about to run a task from the LIFO slot.
621 lifo_polls += 1;
622 super::counters::inc_lifo_schedules();
623
624 // Disable the LIFO slot if we reach our limit
625 //
626 // In ping-ping style workloads where task A notifies task B,
627 // which notifies task A again, continuously prioritizing the
628 // LIFO slot can cause starvation as these two tasks will
629 // repeatedly schedule the other. To mitigate this, we limit the
630 // number of times the LIFO slot is prioritized.
631 if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
632 core.lifo_enabled = false;
633 super::counters::inc_lifo_capped();
634 }
635
636 // Run the LIFO task, then loop
637 *self.core.borrow_mut() = Some(core);
638 let task = self.worker.handle.shared.owned.assert_owner(task);
639 task.run();
640 }
641 })
642 }
643
reset_lifo_enabled(&self, core: &mut Core)644 fn reset_lifo_enabled(&self, core: &mut Core) {
645 core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
646 }
647
assert_lifo_enabled_is_correct(&self, core: &Core)648 fn assert_lifo_enabled_is_correct(&self, core: &Core) {
649 debug_assert_eq!(
650 core.lifo_enabled,
651 !self.worker.handle.shared.config.disable_lifo_slot
652 );
653 }
654
maintenance(&self, mut core: Box<Core>) -> Box<Core>655 fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
656 if core.tick % self.worker.handle.shared.config.event_interval == 0 {
657 super::counters::inc_num_maintenance();
658
659 core.stats.end_processing_scheduled_tasks();
660
661 // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
662 // to run without actually putting the thread to sleep.
663 core = self.park_timeout(core, Some(Duration::from_millis(0)));
664
665 // Run regularly scheduled maintenance
666 core.maintenance(&self.worker);
667
668 core.stats.start_processing_scheduled_tasks();
669 }
670
671 core
672 }
673
674 /// Parks the worker thread while waiting for tasks to execute.
675 ///
676 /// This function checks if indeed there's no more work left to be done before parking.
677 /// Also important to notice that, before parking, the worker thread will try to take
678 /// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
679 /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
680 /// in its own local queue until the queue saturates (ntasks > LOCAL_QUEUE_CAPACITY).
681 /// When the local queue is saturated, the overflow tasks are added to the injection queue
682 /// from where other workers can pick them up.
683 /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
684 /// after all the IOs get dispatched
park(&self, mut core: Box<Core>) -> Box<Core>685 fn park(&self, mut core: Box<Core>) -> Box<Core> {
686 if let Some(f) = &self.worker.handle.shared.config.before_park {
687 f();
688 }
689
690 if core.transition_to_parked(&self.worker) {
691 while !core.is_shutdown && !core.is_traced {
692 core.stats.about_to_park();
693 core = self.park_timeout(core, None);
694
695 // Run regularly scheduled maintenance
696 core.maintenance(&self.worker);
697
698 if core.transition_from_parked(&self.worker) {
699 break;
700 }
701 }
702 }
703
704 if let Some(f) = &self.worker.handle.shared.config.after_unpark {
705 f();
706 }
707 core
708 }
709
park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core>710 fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
711 self.assert_lifo_enabled_is_correct(&core);
712
713 // Take the parker out of core
714 let mut park = core.park.take().expect("park missing");
715
716 // Store `core` in context
717 *self.core.borrow_mut() = Some(core);
718
719 // Park thread
720 if let Some(timeout) = duration {
721 park.park_timeout(&self.worker.handle.driver, timeout);
722 } else {
723 park.park(&self.worker.handle.driver);
724 }
725
726 self.defer.wake();
727
728 // Remove `core` from context
729 core = self.core.borrow_mut().take().expect("core missing");
730
731 // Place `park` back in `core`
732 core.park = Some(park);
733
734 if core.should_notify_others() {
735 self.worker.handle.notify_parked_local();
736 }
737
738 core
739 }
740
defer(&self, waker: &Waker)741 pub(crate) fn defer(&self, waker: &Waker) {
742 self.defer.defer(waker);
743 }
744 }
745
746 impl Core {
747 /// Increment the tick
tick(&mut self)748 fn tick(&mut self) {
749 self.tick = self.tick.wrapping_add(1);
750 }
751
752 /// Return the next notified task available to this worker.
next_task(&mut self, worker: &Worker) -> Option<Notified>753 fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
754 if self.tick % self.global_queue_interval == 0 {
755 // Update the global queue interval, if needed
756 self.tune_global_queue_interval(worker);
757
758 worker
759 .handle
760 .next_remote_task()
761 .or_else(|| self.next_local_task())
762 } else {
763 let maybe_task = self.next_local_task();
764
765 if maybe_task.is_some() {
766 return maybe_task;
767 }
768
769 if worker.inject().is_empty() {
770 return None;
771 }
772
773 // Other threads can only **remove** tasks from the current worker's
774 // `run_queue`. So, we can be confident that by the time we call
775 // `run_queue.push_back` below, there will be *at least* `cap`
776 // available slots in the queue.
777 let cap = usize::min(
778 self.run_queue.remaining_slots(),
779 self.run_queue.max_capacity() / 2,
780 );
781
782 // The worker is currently idle, pull a batch of work from the
783 // injection queue. We don't want to pull *all* the work so other
784 // workers can also get some.
785 let n = usize::min(
786 worker.inject().len() / worker.handle.shared.remotes.len() + 1,
787 cap,
788 );
789
790 // Take at least one task since the first task is returned directly
791 // and nto pushed onto the local queue.
792 let n = usize::max(1, n);
793
794 let mut synced = worker.handle.shared.synced.lock();
795 // safety: passing in the correct `inject::Synced`.
796 let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
797
798 // Pop the first task to return immedietly
799 let ret = tasks.next();
800
801 // Push the rest of the on the run queue
802 self.run_queue.push_back(tasks);
803
804 ret
805 }
806 }
807
next_local_task(&mut self) -> Option<Notified>808 fn next_local_task(&mut self) -> Option<Notified> {
809 self.lifo_slot.take().or_else(|| self.run_queue.pop())
810 }
811
812 /// Function responsible for stealing tasks from another worker
813 ///
814 /// Note: Only if less than half the workers are searching for tasks to steal
815 /// a new worker will actually try to steal. The idea is to make sure not all
816 /// workers will be trying to steal at the same time.
steal_work(&mut self, worker: &Worker) -> Option<Notified>817 fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
818 if !self.transition_to_searching(worker) {
819 return None;
820 }
821
822 let num = worker.handle.shared.remotes.len();
823 // Start from a random worker
824 let start = self.rand.fastrand_n(num as u32) as usize;
825
826 for i in 0..num {
827 let i = (start + i) % num;
828
829 // Don't steal from ourself! We know we don't have work.
830 if i == worker.index {
831 continue;
832 }
833
834 let target = &worker.handle.shared.remotes[i];
835 if let Some(task) = target
836 .steal
837 .steal_into(&mut self.run_queue, &mut self.stats)
838 {
839 return Some(task);
840 }
841 }
842
843 // Fallback on checking the global queue
844 worker.handle.next_remote_task()
845 }
846
transition_to_searching(&mut self, worker: &Worker) -> bool847 fn transition_to_searching(&mut self, worker: &Worker) -> bool {
848 if !self.is_searching {
849 self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
850 }
851
852 self.is_searching
853 }
854
transition_from_searching(&mut self, worker: &Worker)855 fn transition_from_searching(&mut self, worker: &Worker) {
856 if !self.is_searching {
857 return;
858 }
859
860 self.is_searching = false;
861 worker.handle.transition_worker_from_searching();
862 }
863
has_tasks(&self) -> bool864 fn has_tasks(&self) -> bool {
865 self.lifo_slot.is_some() || self.run_queue.has_tasks()
866 }
867
should_notify_others(&self) -> bool868 fn should_notify_others(&self) -> bool {
869 // If there are tasks available to steal, but this worker is not
870 // looking for tasks to steal, notify another worker.
871 if self.is_searching {
872 return false;
873 }
874 self.lifo_slot.is_some() as usize + self.run_queue.len() > 1
875 }
876
877 /// Prepares the worker state for parking.
878 ///
879 /// Returns true if the transition happened, false if there is work to do first.
transition_to_parked(&mut self, worker: &Worker) -> bool880 fn transition_to_parked(&mut self, worker: &Worker) -> bool {
881 // Workers should not park if they have work to do
882 if self.has_tasks() || self.is_traced {
883 return false;
884 }
885
886 // When the final worker transitions **out** of searching to parked, it
887 // must check all the queues one last time in case work materialized
888 // between the last work scan and transitioning out of searching.
889 let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
890 &worker.handle.shared,
891 worker.index,
892 self.is_searching,
893 );
894
895 // The worker is no longer searching. Setting this is the local cache
896 // only.
897 self.is_searching = false;
898
899 if is_last_searcher {
900 worker.handle.notify_if_work_pending();
901 }
902
903 true
904 }
905
906 /// Returns `true` if the transition happened.
transition_from_parked(&mut self, worker: &Worker) -> bool907 fn transition_from_parked(&mut self, worker: &Worker) -> bool {
908 // If a task is in the lifo slot/run queue, then we must unpark regardless of
909 // being notified
910 if self.has_tasks() {
911 // When a worker wakes, it should only transition to the "searching"
912 // state when the wake originates from another worker *or* a new task
913 // is pushed. We do *not* want the worker to transition to "searching"
914 // when it wakes when the I/O driver receives new events.
915 self.is_searching = !worker
916 .handle
917 .shared
918 .idle
919 .unpark_worker_by_id(&worker.handle.shared, worker.index);
920 return true;
921 }
922
923 if worker
924 .handle
925 .shared
926 .idle
927 .is_parked(&worker.handle.shared, worker.index)
928 {
929 return false;
930 }
931
932 // When unparked, the worker is in the searching state.
933 self.is_searching = true;
934 true
935 }
936
937 /// Runs maintenance work such as checking the pool's state.
maintenance(&mut self, worker: &Worker)938 fn maintenance(&mut self, worker: &Worker) {
939 self.stats
940 .submit(&worker.handle.shared.worker_metrics[worker.index]);
941
942 if !self.is_shutdown {
943 // Check if the scheduler has been shutdown
944 let synced = worker.handle.shared.synced.lock();
945 self.is_shutdown = worker.inject().is_closed(&synced.inject);
946 }
947
948 if !self.is_traced {
949 // Check if the worker should be tracing.
950 self.is_traced = worker.handle.shared.trace_status.trace_requested();
951 }
952 }
953
954 /// Signals all tasks to shut down, and waits for them to complete. Must run
955 /// before we enter the single-threaded phase of shutdown processing.
pre_shutdown(&mut self, worker: &Worker)956 fn pre_shutdown(&mut self, worker: &Worker) {
957 // Signal to all tasks to shut down.
958 worker.handle.shared.owned.close_and_shutdown_all();
959
960 self.stats
961 .submit(&worker.handle.shared.worker_metrics[worker.index]);
962 }
963
964 /// Shuts down the core.
shutdown(&mut self, handle: &Handle)965 fn shutdown(&mut self, handle: &Handle) {
966 // Take the core
967 let mut park = self.park.take().expect("park missing");
968
969 // Drain the queue
970 while self.next_local_task().is_some() {}
971
972 park.shutdown(&handle.driver);
973 }
974
tune_global_queue_interval(&mut self, worker: &Worker)975 fn tune_global_queue_interval(&mut self, worker: &Worker) {
976 let next = self
977 .stats
978 .tuned_global_queue_interval(&worker.handle.shared.config);
979
980 debug_assert!(next > 1);
981
982 // Smooth out jitter
983 if abs_diff(self.global_queue_interval, next) > 2 {
984 self.global_queue_interval = next;
985 }
986 }
987 }
988
989 impl Worker {
990 /// Returns a reference to the scheduler's injection queue.
inject(&self) -> &inject::Shared<Arc<Handle>>991 fn inject(&self) -> &inject::Shared<Arc<Handle>> {
992 &self.handle.shared.inject
993 }
994 }
995
996 // TODO: Move `Handle` impls into handle.rs
997 impl task::Schedule for Arc<Handle> {
release(&self, task: &Task) -> Option<Task>998 fn release(&self, task: &Task) -> Option<Task> {
999 self.shared.owned.remove(task)
1000 }
1001
schedule(&self, task: Notified)1002 fn schedule(&self, task: Notified) {
1003 self.schedule_task(task, false);
1004 }
1005
yield_now(&self, task: Notified)1006 fn yield_now(&self, task: Notified) {
1007 self.schedule_task(task, true);
1008 }
1009 }
1010
1011 impl Handle {
schedule_task(&self, task: Notified, is_yield: bool)1012 pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1013 with_current(|maybe_cx| {
1014 if let Some(cx) = maybe_cx {
1015 // Make sure the task is part of the **current** scheduler.
1016 if self.ptr_eq(&cx.worker.handle) {
1017 // And the current thread still holds a core
1018 if let Some(core) = cx.core.borrow_mut().as_mut() {
1019 self.schedule_local(core, task, is_yield);
1020 return;
1021 }
1022 }
1023 }
1024
1025 // Otherwise, use the inject queue.
1026 self.push_remote_task(task);
1027 self.notify_parked_remote();
1028 })
1029 }
1030
schedule_option_task_without_yield(&self, task: Option<Notified>)1031 pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
1032 if let Some(task) = task {
1033 self.schedule_task(task, false);
1034 }
1035 }
1036
schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool)1037 fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
1038 core.stats.inc_local_schedule_count();
1039
1040 // Spawning from the worker thread. If scheduling a "yield" then the
1041 // task must always be pushed to the back of the queue, enabling other
1042 // tasks to be executed. If **not** a yield, then there is more
1043 // flexibility and the task may go to the front of the queue.
1044 let should_notify = if is_yield || !core.lifo_enabled {
1045 core.run_queue
1046 .push_back_or_overflow(task, self, &mut core.stats);
1047 true
1048 } else {
1049 // Push to the LIFO slot
1050 let prev = core.lifo_slot.take();
1051 let ret = prev.is_some();
1052
1053 if let Some(prev) = prev {
1054 core.run_queue
1055 .push_back_or_overflow(prev, self, &mut core.stats);
1056 }
1057
1058 core.lifo_slot = Some(task);
1059
1060 ret
1061 };
1062
1063 // Only notify if not currently parked. If `park` is `None`, then the
1064 // scheduling is from a resource driver. As notifications often come in
1065 // batches, the notification is delayed until the park is complete.
1066 if should_notify && core.park.is_some() {
1067 self.notify_parked_local();
1068 }
1069 }
1070
next_remote_task(&self) -> Option<Notified>1071 fn next_remote_task(&self) -> Option<Notified> {
1072 if self.shared.inject.is_empty() {
1073 return None;
1074 }
1075
1076 let mut synced = self.shared.synced.lock();
1077 // safety: passing in correct `idle::Synced`
1078 unsafe { self.shared.inject.pop(&mut synced.inject) }
1079 }
1080
push_remote_task(&self, task: Notified)1081 fn push_remote_task(&self, task: Notified) {
1082 self.shared.scheduler_metrics.inc_remote_schedule_count();
1083
1084 let mut synced = self.shared.synced.lock();
1085 // safety: passing in correct `idle::Synced`
1086 unsafe {
1087 self.shared.inject.push(&mut synced.inject, task);
1088 }
1089 }
1090
close(&self)1091 pub(super) fn close(&self) {
1092 if self
1093 .shared
1094 .inject
1095 .close(&mut self.shared.synced.lock().inject)
1096 {
1097 self.notify_all();
1098 }
1099 }
1100
notify_parked_local(&self)1101 fn notify_parked_local(&self) {
1102 super::counters::inc_num_inc_notify_local();
1103
1104 if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1105 super::counters::inc_num_unparks_local();
1106 self.shared.remotes[index].unpark.unpark(&self.driver);
1107 }
1108 }
1109
notify_parked_remote(&self)1110 fn notify_parked_remote(&self) {
1111 if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1112 self.shared.remotes[index].unpark.unpark(&self.driver);
1113 }
1114 }
1115
notify_all(&self)1116 pub(super) fn notify_all(&self) {
1117 for remote in &self.shared.remotes[..] {
1118 remote.unpark.unpark(&self.driver);
1119 }
1120 }
1121
notify_if_work_pending(&self)1122 fn notify_if_work_pending(&self) {
1123 for remote in &self.shared.remotes[..] {
1124 if !remote.steal.is_empty() {
1125 self.notify_parked_local();
1126 return;
1127 }
1128 }
1129
1130 if !self.shared.inject.is_empty() {
1131 self.notify_parked_local();
1132 }
1133 }
1134
transition_worker_from_searching(&self)1135 fn transition_worker_from_searching(&self) {
1136 if self.shared.idle.transition_worker_from_searching() {
1137 // We are the final searching worker. Because work was found, we
1138 // need to notify another worker.
1139 self.notify_parked_local();
1140 }
1141 }
1142
1143 /// Signals that a worker has observed the shutdown signal and has replaced
1144 /// its core back into its handle.
1145 ///
1146 /// If all workers have reached this point, the final cleanup is performed.
shutdown_core(&self, core: Box<Core>)1147 fn shutdown_core(&self, core: Box<Core>) {
1148 let mut cores = self.shared.shutdown_cores.lock();
1149 cores.push(core);
1150
1151 if cores.len() != self.shared.remotes.len() {
1152 return;
1153 }
1154
1155 debug_assert!(self.shared.owned.is_empty());
1156
1157 for mut core in cores.drain(..) {
1158 core.shutdown(self);
1159 }
1160
1161 // Drain the injection queue
1162 //
1163 // We already shut down every task, so we can simply drop the tasks.
1164 while let Some(task) = self.next_remote_task() {
1165 drop(task);
1166 }
1167 }
1168
ptr_eq(&self, other: &Handle) -> bool1169 fn ptr_eq(&self, other: &Handle) -> bool {
1170 std::ptr::eq(self, other)
1171 }
1172 }
1173
1174 impl Overflow<Arc<Handle>> for Handle {
push(&self, task: task::Notified<Arc<Handle>>)1175 fn push(&self, task: task::Notified<Arc<Handle>>) {
1176 self.push_remote_task(task);
1177 }
1178
push_batch<I>(&self, iter: I) where I: Iterator<Item = task::Notified<Arc<Handle>>>,1179 fn push_batch<I>(&self, iter: I)
1180 where
1181 I: Iterator<Item = task::Notified<Arc<Handle>>>,
1182 {
1183 unsafe {
1184 self.shared.inject.push_batch(self, iter);
1185 }
1186 }
1187 }
1188
1189 pub(crate) struct InjectGuard<'a> {
1190 lock: crate::loom::sync::MutexGuard<'a, Synced>,
1191 }
1192
1193 impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
as_mut(&mut self) -> &mut inject::Synced1194 fn as_mut(&mut self) -> &mut inject::Synced {
1195 &mut self.lock.inject
1196 }
1197 }
1198
1199 impl<'a> Lock<inject::Synced> for &'a Handle {
1200 type Handle = InjectGuard<'a>;
1201
lock(self) -> Self::Handle1202 fn lock(self) -> Self::Handle {
1203 InjectGuard {
1204 lock: self.shared.synced.lock(),
1205 }
1206 }
1207 }
1208
1209 #[track_caller]
with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R1210 fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1211 use scheduler::Context::MultiThread;
1212
1213 context::with_scheduler(|ctx| match ctx {
1214 Some(MultiThread(ctx)) => f(Some(ctx)),
1215 _ => f(None),
1216 })
1217 }
1218
1219 // `u32::abs_diff` is not available on Tokio's MSRV.
abs_diff(a: u32, b: u32) -> u321220 fn abs_diff(a: u32, b: u32) -> u32 {
1221 if a > b {
1222 a - b
1223 } else {
1224 b - a
1225 }
1226 }
1227