• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6 //!
7 //! # Shutdown
8 //!
9 //! Shutting down the runtime involves the following steps:
10 //!
11 //!  1. The Shared::close method is called. This closes the inject queue and
12 //!     OwnedTasks instance and wakes up all worker threads.
13 //!
14 //!  2. Each worker thread observes the close signal next time it runs
15 //!     Core::maintenance by checking whether the inject queue is closed.
16 //!     The Core::is_shutdown flag is set to true.
17 //!
18 //!  3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19 //!     will keep removing tasks from OwnedTasks until it is empty. No new
20 //!     tasks can be pushed to the OwnedTasks during or after this step as it
21 //!     was closed in step 1.
22 //!
23 //!  5. The workers call Shared::shutdown to enter the single-threaded phase of
24 //!     shutdown. These calls will push their core to Shared::shutdown_cores,
25 //!     and the last thread to push its core will finish the shutdown procedure.
26 //!
27 //!  6. The local run queue of each core is emptied, then the inject queue is
28 //!     emptied.
29 //!
30 //! At this point, shutdown has completed. It is not possible for any of the
31 //! collections to contain any tasks at this point, as each collection was
32 //! closed first, then emptied afterwards.
33 //!
34 //! ## Spawns during shutdown
35 //!
36 //! When spawning tasks during shutdown, there are two cases:
37 //!
38 //!  * The spawner observes the OwnedTasks being open, and the inject queue is
39 //!    closed.
40 //!  * The spawner observes the OwnedTasks being closed and doesn't check the
41 //!    inject queue.
42 //!
43 //! The first case can only happen if the OwnedTasks::bind call happens before
44 //! or during step 1 of shutdown. In this case, the runtime will clean up the
45 //! task in step 3 of shutdown.
46 //!
47 //! In the latter case, the task was not spawned and the task is immediately
48 //! cancelled by the spawner.
49 //!
50 //! The correctness of shutdown requires both the inject queue and OwnedTasks
51 //! collection to have a closed bit. With a close bit on only the inject queue,
52 //! spawning could run in to a situation where a task is successfully bound long
53 //! after the runtime has shut down. With a close bit on only the OwnedTasks,
54 //! the first spawning situation could result in the notification being pushed
55 //! to the inject queue after step 6 of shutdown, which would leave a task in
56 //! the inject queue indefinitely. This would be a ref-count cycle and a memory
57 //! leak.
58 
59 use crate::loom::sync::{Arc, Mutex};
60 use crate::runtime;
61 use crate::runtime::context;
62 use crate::runtime::scheduler::multi_thread::{
63     idle, queue, Counters, Handle, Idle, Overflow, Parker, Stats, TraceStatus, Unparker,
64 };
65 use crate::runtime::scheduler::{inject, Defer, Lock};
66 use crate::runtime::task::OwnedTasks;
67 use crate::runtime::{
68     blocking, coop, driver, scheduler, task, Config, SchedulerMetrics, WorkerMetrics,
69 };
70 use crate::util::atomic_cell::AtomicCell;
71 use crate::util::rand::{FastRand, RngSeedGenerator};
72 
73 use std::cell::RefCell;
74 use std::task::Waker;
75 use std::time::Duration;
76 
77 cfg_metrics! {
78     mod metrics;
79 }
80 
81 cfg_taskdump! {
82     mod taskdump;
83 }
84 
85 cfg_not_taskdump! {
86     mod taskdump_mock;
87 }
88 
89 /// A scheduler worker
90 pub(super) struct Worker {
91     /// Reference to scheduler's handle
92     handle: Arc<Handle>,
93 
94     /// Index holding this worker's remote state
95     index: usize,
96 
97     /// Used to hand-off a worker's core to another thread.
98     core: AtomicCell<Core>,
99 }
100 
101 /// Core data
102 struct Core {
103     /// Used to schedule bookkeeping tasks every so often.
104     tick: u32,
105 
106     /// When a task is scheduled from a worker, it is stored in this slot. The
107     /// worker will check this slot for a task **before** checking the run
108     /// queue. This effectively results in the **last** scheduled task to be run
109     /// next (LIFO). This is an optimization for improving locality which
110     /// benefits message passing patterns and helps to reduce latency.
111     lifo_slot: Option<Notified>,
112 
113     /// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
114     /// they go to the back of the `run_queue`.
115     lifo_enabled: bool,
116 
117     /// The worker-local run queue.
118     run_queue: queue::Local<Arc<Handle>>,
119 
120     /// True if the worker is currently searching for more work. Searching
121     /// involves attempting to steal from other workers.
122     is_searching: bool,
123 
124     /// True if the scheduler is being shutdown
125     is_shutdown: bool,
126 
127     /// True if the scheduler is being traced
128     is_traced: bool,
129 
130     /// Parker
131     ///
132     /// Stored in an `Option` as the parker is added / removed to make the
133     /// borrow checker happy.
134     park: Option<Parker>,
135 
136     /// Per-worker runtime stats
137     stats: Stats,
138 
139     /// How often to check the global queue
140     global_queue_interval: u32,
141 
142     /// Fast random number generator.
143     rand: FastRand,
144 }
145 
146 /// State shared across all workers
147 pub(crate) struct Shared {
148     /// Per-worker remote state. All other workers have access to this and is
149     /// how they communicate between each other.
150     remotes: Box<[Remote]>,
151 
152     /// Global task queue used for:
153     ///  1. Submit work to the scheduler while **not** currently on a worker thread.
154     ///  2. Submit work to the scheduler when a worker run queue is saturated
155     pub(super) inject: inject::Shared<Arc<Handle>>,
156 
157     /// Coordinates idle workers
158     idle: Idle,
159 
160     /// Collection of all active tasks spawned onto this executor.
161     pub(crate) owned: OwnedTasks<Arc<Handle>>,
162 
163     /// Data synchronized by the scheduler mutex
164     pub(super) synced: Mutex<Synced>,
165 
166     /// Cores that have observed the shutdown signal
167     ///
168     /// The core is **not** placed back in the worker to avoid it from being
169     /// stolen by a thread that was spawned as part of `block_in_place`.
170     #[allow(clippy::vec_box)] // we're moving an already-boxed value
171     shutdown_cores: Mutex<Vec<Box<Core>>>,
172 
173     /// The number of cores that have observed the trace signal.
174     pub(super) trace_status: TraceStatus,
175 
176     /// Scheduler configuration options
177     config: Config,
178 
179     /// Collects metrics from the runtime.
180     pub(super) scheduler_metrics: SchedulerMetrics,
181 
182     pub(super) worker_metrics: Box<[WorkerMetrics]>,
183 
184     /// Only held to trigger some code on drop. This is used to get internal
185     /// runtime metrics that can be useful when doing performance
186     /// investigations. This does nothing (empty struct, no drop impl) unless
187     /// the `tokio_internal_mt_counters` cfg flag is set.
188     _counters: Counters,
189 }
190 
191 /// Data synchronized by the scheduler mutex
192 pub(crate) struct Synced {
193     /// Synchronized state for `Idle`.
194     pub(super) idle: idle::Synced,
195 
196     /// Synchronized state for `Inject`.
197     pub(crate) inject: inject::Synced,
198 }
199 
200 /// Used to communicate with a worker from other threads.
201 struct Remote {
202     /// Steals tasks from this worker.
203     pub(super) steal: queue::Steal<Arc<Handle>>,
204 
205     /// Unparks the associated worker thread
206     unpark: Unparker,
207 }
208 
209 /// Thread-local context
210 pub(crate) struct Context {
211     /// Worker
212     worker: Arc<Worker>,
213 
214     /// Core data
215     core: RefCell<Option<Box<Core>>>,
216 
217     /// Tasks to wake after resource drivers are polled. This is mostly to
218     /// handle yielded tasks.
219     pub(crate) defer: Defer,
220 }
221 
222 /// Starts the workers
223 pub(crate) struct Launch(Vec<Arc<Worker>>);
224 
225 /// Running a task may consume the core. If the core is still available when
226 /// running the task completes, it is returned. Otherwise, the worker will need
227 /// to stop processing.
228 type RunResult = Result<Box<Core>, ()>;
229 
230 /// A task handle
231 type Task = task::Task<Arc<Handle>>;
232 
233 /// A notified task handle
234 type Notified = task::Notified<Arc<Handle>>;
235 
236 /// Value picked out of thin-air. Running the LIFO slot a handful of times
237 /// seemms sufficient to benefit from locality. More than 3 times probably is
238 /// overweighing. The value can be tuned in the future with data that shows
239 /// improvements.
240 const MAX_LIFO_POLLS_PER_TICK: usize = 3;
241 
create( size: usize, park: Parker, driver_handle: driver::Handle, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config, ) -> (Arc<Handle>, Launch)242 pub(super) fn create(
243     size: usize,
244     park: Parker,
245     driver_handle: driver::Handle,
246     blocking_spawner: blocking::Spawner,
247     seed_generator: RngSeedGenerator,
248     config: Config,
249 ) -> (Arc<Handle>, Launch) {
250     let mut cores = Vec::with_capacity(size);
251     let mut remotes = Vec::with_capacity(size);
252     let mut worker_metrics = Vec::with_capacity(size);
253 
254     // Create the local queues
255     for _ in 0..size {
256         let (steal, run_queue) = queue::local();
257 
258         let park = park.clone();
259         let unpark = park.unpark();
260         let metrics = WorkerMetrics::from_config(&config);
261         let stats = Stats::new(&metrics);
262 
263         cores.push(Box::new(Core {
264             tick: 0,
265             lifo_slot: None,
266             lifo_enabled: !config.disable_lifo_slot,
267             run_queue,
268             is_searching: false,
269             is_shutdown: false,
270             is_traced: false,
271             park: Some(park),
272             global_queue_interval: stats.tuned_global_queue_interval(&config),
273             stats,
274             rand: FastRand::from_seed(config.seed_generator.next_seed()),
275         }));
276 
277         remotes.push(Remote { steal, unpark });
278         worker_metrics.push(metrics);
279     }
280 
281     let (idle, idle_synced) = Idle::new(size);
282     let (inject, inject_synced) = inject::Shared::new();
283 
284     let remotes_len = remotes.len();
285     let handle = Arc::new(Handle {
286         shared: Shared {
287             remotes: remotes.into_boxed_slice(),
288             inject,
289             idle,
290             owned: OwnedTasks::new(),
291             synced: Mutex::new(Synced {
292                 idle: idle_synced,
293                 inject: inject_synced,
294             }),
295             shutdown_cores: Mutex::new(vec![]),
296             trace_status: TraceStatus::new(remotes_len),
297             config,
298             scheduler_metrics: SchedulerMetrics::new(),
299             worker_metrics: worker_metrics.into_boxed_slice(),
300             _counters: Counters,
301         },
302         driver: driver_handle,
303         blocking_spawner,
304         seed_generator,
305     });
306 
307     let mut launch = Launch(vec![]);
308 
309     for (index, core) in cores.drain(..).enumerate() {
310         launch.0.push(Arc::new(Worker {
311             handle: handle.clone(),
312             index,
313             core: AtomicCell::new(Some(core)),
314         }));
315     }
316 
317     (handle, launch)
318 }
319 
320 #[track_caller]
block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R,321 pub(crate) fn block_in_place<F, R>(f: F) -> R
322 where
323     F: FnOnce() -> R,
324 {
325     // Try to steal the worker core back
326     struct Reset {
327         take_core: bool,
328         budget: coop::Budget,
329     }
330 
331     impl Drop for Reset {
332         fn drop(&mut self) {
333             with_current(|maybe_cx| {
334                 if let Some(cx) = maybe_cx {
335                     if self.take_core {
336                         let core = cx.worker.core.take();
337                         let mut cx_core = cx.core.borrow_mut();
338                         assert!(cx_core.is_none());
339                         *cx_core = core;
340                     }
341 
342                     // Reset the task budget as we are re-entering the
343                     // runtime.
344                     coop::set(self.budget);
345                 }
346             });
347         }
348     }
349 
350     let mut had_entered = false;
351     let mut take_core = false;
352 
353     let setup_result = with_current(|maybe_cx| {
354         match (
355             crate::runtime::context::current_enter_context(),
356             maybe_cx.is_some(),
357         ) {
358             (context::EnterRuntime::Entered { .. }, true) => {
359                 // We are on a thread pool runtime thread, so we just need to
360                 // set up blocking.
361                 had_entered = true;
362             }
363             (
364                 context::EnterRuntime::Entered {
365                     allow_block_in_place,
366                 },
367                 false,
368             ) => {
369                 // We are on an executor, but _not_ on the thread pool.  That is
370                 // _only_ okay if we are in a thread pool runtime's block_on
371                 // method:
372                 if allow_block_in_place {
373                     had_entered = true;
374                     return Ok(());
375                 } else {
376                     // This probably means we are on the current_thread runtime or in a
377                     // LocalSet, where it is _not_ okay to block.
378                     return Err(
379                         "can call blocking only when running on the multi-threaded runtime",
380                     );
381                 }
382             }
383             (context::EnterRuntime::NotEntered, true) => {
384                 // This is a nested call to block_in_place (we already exited).
385                 // All the necessary setup has already been done.
386                 return Ok(());
387             }
388             (context::EnterRuntime::NotEntered, false) => {
389                 // We are outside of the tokio runtime, so blocking is fine.
390                 // We can also skip all of the thread pool blocking setup steps.
391                 return Ok(());
392             }
393         }
394 
395         let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
396 
397         // Get the worker core. If none is set, then blocking is fine!
398         let core = match cx.core.borrow_mut().take() {
399             Some(core) => core,
400             None => return Ok(()),
401         };
402 
403         // We are taking the core from the context and sending it to another
404         // thread.
405         take_core = true;
406 
407         // The parker should be set here
408         assert!(core.park.is_some());
409 
410         // In order to block, the core must be sent to another thread for
411         // execution.
412         //
413         // First, move the core back into the worker's shared core slot.
414         cx.worker.core.set(core);
415 
416         // Next, clone the worker handle and send it to a new thread for
417         // processing.
418         //
419         // Once the blocking task is done executing, we will attempt to
420         // steal the core back.
421         let worker = cx.worker.clone();
422         runtime::spawn_blocking(move || run(worker));
423         Ok(())
424     });
425 
426     if let Err(panic_message) = setup_result {
427         panic!("{}", panic_message);
428     }
429 
430     if had_entered {
431         // Unset the current task's budget. Blocking sections are not
432         // constrained by task budgets.
433         let _reset = Reset {
434             take_core,
435             budget: coop::stop(),
436         };
437 
438         crate::runtime::context::exit_runtime(f)
439     } else {
440         f()
441     }
442 }
443 
444 impl Launch {
launch(mut self)445     pub(crate) fn launch(mut self) {
446         for worker in self.0.drain(..) {
447             runtime::spawn_blocking(move || run(worker));
448         }
449     }
450 }
451 
run(worker: Arc<Worker>)452 fn run(worker: Arc<Worker>) {
453     struct AbortOnPanic;
454 
455     impl Drop for AbortOnPanic {
456         fn drop(&mut self) {
457             if std::thread::panicking() {
458                 eprintln!("worker thread panicking; aborting process");
459                 std::process::abort();
460             }
461         }
462     }
463 
464     // Catching panics on worker threads in tests is quite tricky. Instead, when
465     // debug assertions are enabled, we just abort the process.
466     #[cfg(debug_assertions)]
467     let _abort_on_panic = AbortOnPanic;
468 
469     // Acquire a core. If this fails, then another thread is running this
470     // worker and there is nothing further to do.
471     let core = match worker.core.take() {
472         Some(core) => core,
473         None => return,
474     };
475 
476     let handle = scheduler::Handle::MultiThread(worker.handle.clone());
477 
478     crate::runtime::context::enter_runtime(&handle, true, |_| {
479         // Set the worker context.
480         let cx = scheduler::Context::MultiThread(Context {
481             worker,
482             core: RefCell::new(None),
483             defer: Defer::new(),
484         });
485 
486         context::set_scheduler(&cx, || {
487             let cx = cx.expect_multi_thread();
488 
489             // This should always be an error. It only returns a `Result` to support
490             // using `?` to short circuit.
491             assert!(cx.run(core).is_err());
492 
493             // Check if there are any deferred tasks to notify. This can happen when
494             // the worker core is lost due to `block_in_place()` being called from
495             // within the task.
496             cx.defer.wake();
497         });
498     });
499 }
500 
501 impl Context {
run(&self, mut core: Box<Core>) -> RunResult502     fn run(&self, mut core: Box<Core>) -> RunResult {
503         // Reset `lifo_enabled` here in case the core was previously stolen from
504         // a task that had the LIFO slot disabled.
505         self.reset_lifo_enabled(&mut core);
506 
507         // Start as "processing" tasks as polling tasks from the local queue
508         // will be one of the first things we do.
509         core.stats.start_processing_scheduled_tasks();
510 
511         while !core.is_shutdown {
512             self.assert_lifo_enabled_is_correct(&core);
513 
514             if core.is_traced {
515                 core = self.worker.handle.trace_core(core);
516             }
517 
518             // Increment the tick
519             core.tick();
520 
521             // Run maintenance, if needed
522             core = self.maintenance(core);
523 
524             // First, check work available to the current worker.
525             if let Some(task) = core.next_task(&self.worker) {
526                 core = self.run_task(task, core)?;
527                 continue;
528             }
529 
530             // We consumed all work in the queues and will start searching for work.
531             core.stats.end_processing_scheduled_tasks();
532 
533             // There is no more **local** work to process, try to steal work
534             // from other workers.
535             if let Some(task) = core.steal_work(&self.worker) {
536                 // Found work, switch back to processing
537                 core.stats.start_processing_scheduled_tasks();
538                 core = self.run_task(task, core)?;
539             } else {
540                 // Wait for work
541                 core = if !self.defer.is_empty() {
542                     self.park_timeout(core, Some(Duration::from_millis(0)))
543                 } else {
544                     self.park(core)
545                 };
546             }
547         }
548 
549         core.pre_shutdown(&self.worker);
550 
551         // Signal shutdown
552         self.worker.handle.shutdown_core(core);
553         Err(())
554     }
555 
run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult556     fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
557         let task = self.worker.handle.shared.owned.assert_owner(task);
558 
559         // Make sure the worker is not in the **searching** state. This enables
560         // another idle worker to try to steal work.
561         core.transition_from_searching(&self.worker);
562 
563         self.assert_lifo_enabled_is_correct(&core);
564 
565         // Measure the poll start time. Note that we may end up polling other
566         // tasks under this measurement. In this case, the tasks came from the
567         // LIFO slot and are considered part of the current task for scheduling
568         // purposes. These tasks inherent the "parent"'s limits.
569         core.stats.start_poll();
570 
571         // Make the core available to the runtime context
572         *self.core.borrow_mut() = Some(core);
573 
574         // Run the task
575         coop::budget(|| {
576             task.run();
577             let mut lifo_polls = 0;
578 
579             // As long as there is budget remaining and a task exists in the
580             // `lifo_slot`, then keep running.
581             loop {
582                 // Check if we still have the core. If not, the core was stolen
583                 // by another worker.
584                 let mut core = match self.core.borrow_mut().take() {
585                     Some(core) => core,
586                     None => {
587                         // In this case, we cannot call `reset_lifo_enabled()`
588                         // because the core was stolen. The stealer will handle
589                         // that at the top of `Context::run`
590                         return Err(());
591                     }
592                 };
593 
594                 // Check for a task in the LIFO slot
595                 let task = match core.lifo_slot.take() {
596                     Some(task) => task,
597                     None => {
598                         self.reset_lifo_enabled(&mut core);
599                         core.stats.end_poll();
600                         return Ok(core);
601                     }
602                 };
603 
604                 if !coop::has_budget_remaining() {
605                     core.stats.end_poll();
606 
607                     // Not enough budget left to run the LIFO task, push it to
608                     // the back of the queue and return.
609                     core.run_queue.push_back_or_overflow(
610                         task,
611                         &*self.worker.handle,
612                         &mut core.stats,
613                     );
614                     // If we hit this point, the LIFO slot should be enabled.
615                     // There is no need to reset it.
616                     debug_assert!(core.lifo_enabled);
617                     return Ok(core);
618                 }
619 
620                 // Track that we are about to run a task from the LIFO slot.
621                 lifo_polls += 1;
622                 super::counters::inc_lifo_schedules();
623 
624                 // Disable the LIFO slot if we reach our limit
625                 //
626                 // In ping-ping style workloads where task A notifies task B,
627                 // which notifies task A again, continuously prioritizing the
628                 // LIFO slot can cause starvation as these two tasks will
629                 // repeatedly schedule the other. To mitigate this, we limit the
630                 // number of times the LIFO slot is prioritized.
631                 if lifo_polls >= MAX_LIFO_POLLS_PER_TICK {
632                     core.lifo_enabled = false;
633                     super::counters::inc_lifo_capped();
634                 }
635 
636                 // Run the LIFO task, then loop
637                 *self.core.borrow_mut() = Some(core);
638                 let task = self.worker.handle.shared.owned.assert_owner(task);
639                 task.run();
640             }
641         })
642     }
643 
reset_lifo_enabled(&self, core: &mut Core)644     fn reset_lifo_enabled(&self, core: &mut Core) {
645         core.lifo_enabled = !self.worker.handle.shared.config.disable_lifo_slot;
646     }
647 
assert_lifo_enabled_is_correct(&self, core: &Core)648     fn assert_lifo_enabled_is_correct(&self, core: &Core) {
649         debug_assert_eq!(
650             core.lifo_enabled,
651             !self.worker.handle.shared.config.disable_lifo_slot
652         );
653     }
654 
maintenance(&self, mut core: Box<Core>) -> Box<Core>655     fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
656         if core.tick % self.worker.handle.shared.config.event_interval == 0 {
657             super::counters::inc_num_maintenance();
658 
659             core.stats.end_processing_scheduled_tasks();
660 
661             // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
662             // to run without actually putting the thread to sleep.
663             core = self.park_timeout(core, Some(Duration::from_millis(0)));
664 
665             // Run regularly scheduled maintenance
666             core.maintenance(&self.worker);
667 
668             core.stats.start_processing_scheduled_tasks();
669         }
670 
671         core
672     }
673 
674     /// Parks the worker thread while waiting for tasks to execute.
675     ///
676     /// This function checks if indeed there's no more work left to be done before parking.
677     /// Also important to notice that, before parking, the worker thread will try to take
678     /// ownership of the Driver (IO/Time) and dispatch any events that might have fired.
679     /// Whenever a worker thread executes the Driver loop, all waken tasks are scheduled
680     /// in its own local queue until the queue saturates (ntasks > LOCAL_QUEUE_CAPACITY).
681     /// When the local queue is saturated, the overflow tasks are added to the injection queue
682     /// from where other workers can pick them up.
683     /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers
684     /// after all the IOs get dispatched
park(&self, mut core: Box<Core>) -> Box<Core>685     fn park(&self, mut core: Box<Core>) -> Box<Core> {
686         if let Some(f) = &self.worker.handle.shared.config.before_park {
687             f();
688         }
689 
690         if core.transition_to_parked(&self.worker) {
691             while !core.is_shutdown && !core.is_traced {
692                 core.stats.about_to_park();
693                 core = self.park_timeout(core, None);
694 
695                 // Run regularly scheduled maintenance
696                 core.maintenance(&self.worker);
697 
698                 if core.transition_from_parked(&self.worker) {
699                     break;
700                 }
701             }
702         }
703 
704         if let Some(f) = &self.worker.handle.shared.config.after_unpark {
705             f();
706         }
707         core
708     }
709 
park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core>710     fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
711         self.assert_lifo_enabled_is_correct(&core);
712 
713         // Take the parker out of core
714         let mut park = core.park.take().expect("park missing");
715 
716         // Store `core` in context
717         *self.core.borrow_mut() = Some(core);
718 
719         // Park thread
720         if let Some(timeout) = duration {
721             park.park_timeout(&self.worker.handle.driver, timeout);
722         } else {
723             park.park(&self.worker.handle.driver);
724         }
725 
726         self.defer.wake();
727 
728         // Remove `core` from context
729         core = self.core.borrow_mut().take().expect("core missing");
730 
731         // Place `park` back in `core`
732         core.park = Some(park);
733 
734         if core.should_notify_others() {
735             self.worker.handle.notify_parked_local();
736         }
737 
738         core
739     }
740 
defer(&self, waker: &Waker)741     pub(crate) fn defer(&self, waker: &Waker) {
742         self.defer.defer(waker);
743     }
744 }
745 
746 impl Core {
747     /// Increment the tick
tick(&mut self)748     fn tick(&mut self) {
749         self.tick = self.tick.wrapping_add(1);
750     }
751 
752     /// Return the next notified task available to this worker.
next_task(&mut self, worker: &Worker) -> Option<Notified>753     fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
754         if self.tick % self.global_queue_interval == 0 {
755             // Update the global queue interval, if needed
756             self.tune_global_queue_interval(worker);
757 
758             worker
759                 .handle
760                 .next_remote_task()
761                 .or_else(|| self.next_local_task())
762         } else {
763             let maybe_task = self.next_local_task();
764 
765             if maybe_task.is_some() {
766                 return maybe_task;
767             }
768 
769             if worker.inject().is_empty() {
770                 return None;
771             }
772 
773             // Other threads can only **remove** tasks from the current worker's
774             // `run_queue`. So, we can be confident that by the time we call
775             // `run_queue.push_back` below, there will be *at least* `cap`
776             // available slots in the queue.
777             let cap = usize::min(
778                 self.run_queue.remaining_slots(),
779                 self.run_queue.max_capacity() / 2,
780             );
781 
782             // The worker is currently idle, pull a batch of work from the
783             // injection queue. We don't want to pull *all* the work so other
784             // workers can also get some.
785             let n = usize::min(
786                 worker.inject().len() / worker.handle.shared.remotes.len() + 1,
787                 cap,
788             );
789 
790             // Take at least one task since the first task is returned directly
791             // and nto pushed onto the local queue.
792             let n = usize::max(1, n);
793 
794             let mut synced = worker.handle.shared.synced.lock();
795             // safety: passing in the correct `inject::Synced`.
796             let mut tasks = unsafe { worker.inject().pop_n(&mut synced.inject, n) };
797 
798             // Pop the first task to return immedietly
799             let ret = tasks.next();
800 
801             // Push the rest of the on the run queue
802             self.run_queue.push_back(tasks);
803 
804             ret
805         }
806     }
807 
next_local_task(&mut self) -> Option<Notified>808     fn next_local_task(&mut self) -> Option<Notified> {
809         self.lifo_slot.take().or_else(|| self.run_queue.pop())
810     }
811 
812     /// Function responsible for stealing tasks from another worker
813     ///
814     /// Note: Only if less than half the workers are searching for tasks to steal
815     /// a new worker will actually try to steal. The idea is to make sure not all
816     /// workers will be trying to steal at the same time.
steal_work(&mut self, worker: &Worker) -> Option<Notified>817     fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
818         if !self.transition_to_searching(worker) {
819             return None;
820         }
821 
822         let num = worker.handle.shared.remotes.len();
823         // Start from a random worker
824         let start = self.rand.fastrand_n(num as u32) as usize;
825 
826         for i in 0..num {
827             let i = (start + i) % num;
828 
829             // Don't steal from ourself! We know we don't have work.
830             if i == worker.index {
831                 continue;
832             }
833 
834             let target = &worker.handle.shared.remotes[i];
835             if let Some(task) = target
836                 .steal
837                 .steal_into(&mut self.run_queue, &mut self.stats)
838             {
839                 return Some(task);
840             }
841         }
842 
843         // Fallback on checking the global queue
844         worker.handle.next_remote_task()
845     }
846 
transition_to_searching(&mut self, worker: &Worker) -> bool847     fn transition_to_searching(&mut self, worker: &Worker) -> bool {
848         if !self.is_searching {
849             self.is_searching = worker.handle.shared.idle.transition_worker_to_searching();
850         }
851 
852         self.is_searching
853     }
854 
transition_from_searching(&mut self, worker: &Worker)855     fn transition_from_searching(&mut self, worker: &Worker) {
856         if !self.is_searching {
857             return;
858         }
859 
860         self.is_searching = false;
861         worker.handle.transition_worker_from_searching();
862     }
863 
has_tasks(&self) -> bool864     fn has_tasks(&self) -> bool {
865         self.lifo_slot.is_some() || self.run_queue.has_tasks()
866     }
867 
should_notify_others(&self) -> bool868     fn should_notify_others(&self) -> bool {
869         // If there are tasks available to steal, but this worker is not
870         // looking for tasks to steal, notify another worker.
871         if self.is_searching {
872             return false;
873         }
874         self.lifo_slot.is_some() as usize + self.run_queue.len() > 1
875     }
876 
877     /// Prepares the worker state for parking.
878     ///
879     /// Returns true if the transition happened, false if there is work to do first.
transition_to_parked(&mut self, worker: &Worker) -> bool880     fn transition_to_parked(&mut self, worker: &Worker) -> bool {
881         // Workers should not park if they have work to do
882         if self.has_tasks() || self.is_traced {
883             return false;
884         }
885 
886         // When the final worker transitions **out** of searching to parked, it
887         // must check all the queues one last time in case work materialized
888         // between the last work scan and transitioning out of searching.
889         let is_last_searcher = worker.handle.shared.idle.transition_worker_to_parked(
890             &worker.handle.shared,
891             worker.index,
892             self.is_searching,
893         );
894 
895         // The worker is no longer searching. Setting this is the local cache
896         // only.
897         self.is_searching = false;
898 
899         if is_last_searcher {
900             worker.handle.notify_if_work_pending();
901         }
902 
903         true
904     }
905 
906     /// Returns `true` if the transition happened.
transition_from_parked(&mut self, worker: &Worker) -> bool907     fn transition_from_parked(&mut self, worker: &Worker) -> bool {
908         // If a task is in the lifo slot/run queue, then we must unpark regardless of
909         // being notified
910         if self.has_tasks() {
911             // When a worker wakes, it should only transition to the "searching"
912             // state when the wake originates from another worker *or* a new task
913             // is pushed. We do *not* want the worker to transition to "searching"
914             // when it wakes when the I/O driver receives new events.
915             self.is_searching = !worker
916                 .handle
917                 .shared
918                 .idle
919                 .unpark_worker_by_id(&worker.handle.shared, worker.index);
920             return true;
921         }
922 
923         if worker
924             .handle
925             .shared
926             .idle
927             .is_parked(&worker.handle.shared, worker.index)
928         {
929             return false;
930         }
931 
932         // When unparked, the worker is in the searching state.
933         self.is_searching = true;
934         true
935     }
936 
937     /// Runs maintenance work such as checking the pool's state.
maintenance(&mut self, worker: &Worker)938     fn maintenance(&mut self, worker: &Worker) {
939         self.stats
940             .submit(&worker.handle.shared.worker_metrics[worker.index]);
941 
942         if !self.is_shutdown {
943             // Check if the scheduler has been shutdown
944             let synced = worker.handle.shared.synced.lock();
945             self.is_shutdown = worker.inject().is_closed(&synced.inject);
946         }
947 
948         if !self.is_traced {
949             // Check if the worker should be tracing.
950             self.is_traced = worker.handle.shared.trace_status.trace_requested();
951         }
952     }
953 
954     /// Signals all tasks to shut down, and waits for them to complete. Must run
955     /// before we enter the single-threaded phase of shutdown processing.
pre_shutdown(&mut self, worker: &Worker)956     fn pre_shutdown(&mut self, worker: &Worker) {
957         // Signal to all tasks to shut down.
958         worker.handle.shared.owned.close_and_shutdown_all();
959 
960         self.stats
961             .submit(&worker.handle.shared.worker_metrics[worker.index]);
962     }
963 
964     /// Shuts down the core.
shutdown(&mut self, handle: &Handle)965     fn shutdown(&mut self, handle: &Handle) {
966         // Take the core
967         let mut park = self.park.take().expect("park missing");
968 
969         // Drain the queue
970         while self.next_local_task().is_some() {}
971 
972         park.shutdown(&handle.driver);
973     }
974 
tune_global_queue_interval(&mut self, worker: &Worker)975     fn tune_global_queue_interval(&mut self, worker: &Worker) {
976         let next = self
977             .stats
978             .tuned_global_queue_interval(&worker.handle.shared.config);
979 
980         debug_assert!(next > 1);
981 
982         // Smooth out jitter
983         if abs_diff(self.global_queue_interval, next) > 2 {
984             self.global_queue_interval = next;
985         }
986     }
987 }
988 
989 impl Worker {
990     /// Returns a reference to the scheduler's injection queue.
inject(&self) -> &inject::Shared<Arc<Handle>>991     fn inject(&self) -> &inject::Shared<Arc<Handle>> {
992         &self.handle.shared.inject
993     }
994 }
995 
996 // TODO: Move `Handle` impls into handle.rs
997 impl task::Schedule for Arc<Handle> {
release(&self, task: &Task) -> Option<Task>998     fn release(&self, task: &Task) -> Option<Task> {
999         self.shared.owned.remove(task)
1000     }
1001 
schedule(&self, task: Notified)1002     fn schedule(&self, task: Notified) {
1003         self.schedule_task(task, false);
1004     }
1005 
yield_now(&self, task: Notified)1006     fn yield_now(&self, task: Notified) {
1007         self.schedule_task(task, true);
1008     }
1009 }
1010 
1011 impl Handle {
schedule_task(&self, task: Notified, is_yield: bool)1012     pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) {
1013         with_current(|maybe_cx| {
1014             if let Some(cx) = maybe_cx {
1015                 // Make sure the task is part of the **current** scheduler.
1016                 if self.ptr_eq(&cx.worker.handle) {
1017                     // And the current thread still holds a core
1018                     if let Some(core) = cx.core.borrow_mut().as_mut() {
1019                         self.schedule_local(core, task, is_yield);
1020                         return;
1021                     }
1022                 }
1023             }
1024 
1025             // Otherwise, use the inject queue.
1026             self.push_remote_task(task);
1027             self.notify_parked_remote();
1028         })
1029     }
1030 
schedule_option_task_without_yield(&self, task: Option<Notified>)1031     pub(super) fn schedule_option_task_without_yield(&self, task: Option<Notified>) {
1032         if let Some(task) = task {
1033             self.schedule_task(task, false);
1034         }
1035     }
1036 
schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool)1037     fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
1038         core.stats.inc_local_schedule_count();
1039 
1040         // Spawning from the worker thread. If scheduling a "yield" then the
1041         // task must always be pushed to the back of the queue, enabling other
1042         // tasks to be executed. If **not** a yield, then there is more
1043         // flexibility and the task may go to the front of the queue.
1044         let should_notify = if is_yield || !core.lifo_enabled {
1045             core.run_queue
1046                 .push_back_or_overflow(task, self, &mut core.stats);
1047             true
1048         } else {
1049             // Push to the LIFO slot
1050             let prev = core.lifo_slot.take();
1051             let ret = prev.is_some();
1052 
1053             if let Some(prev) = prev {
1054                 core.run_queue
1055                     .push_back_or_overflow(prev, self, &mut core.stats);
1056             }
1057 
1058             core.lifo_slot = Some(task);
1059 
1060             ret
1061         };
1062 
1063         // Only notify if not currently parked. If `park` is `None`, then the
1064         // scheduling is from a resource driver. As notifications often come in
1065         // batches, the notification is delayed until the park is complete.
1066         if should_notify && core.park.is_some() {
1067             self.notify_parked_local();
1068         }
1069     }
1070 
next_remote_task(&self) -> Option<Notified>1071     fn next_remote_task(&self) -> Option<Notified> {
1072         if self.shared.inject.is_empty() {
1073             return None;
1074         }
1075 
1076         let mut synced = self.shared.synced.lock();
1077         // safety: passing in correct `idle::Synced`
1078         unsafe { self.shared.inject.pop(&mut synced.inject) }
1079     }
1080 
push_remote_task(&self, task: Notified)1081     fn push_remote_task(&self, task: Notified) {
1082         self.shared.scheduler_metrics.inc_remote_schedule_count();
1083 
1084         let mut synced = self.shared.synced.lock();
1085         // safety: passing in correct `idle::Synced`
1086         unsafe {
1087             self.shared.inject.push(&mut synced.inject, task);
1088         }
1089     }
1090 
close(&self)1091     pub(super) fn close(&self) {
1092         if self
1093             .shared
1094             .inject
1095             .close(&mut self.shared.synced.lock().inject)
1096         {
1097             self.notify_all();
1098         }
1099     }
1100 
notify_parked_local(&self)1101     fn notify_parked_local(&self) {
1102         super::counters::inc_num_inc_notify_local();
1103 
1104         if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1105             super::counters::inc_num_unparks_local();
1106             self.shared.remotes[index].unpark.unpark(&self.driver);
1107         }
1108     }
1109 
notify_parked_remote(&self)1110     fn notify_parked_remote(&self) {
1111         if let Some(index) = self.shared.idle.worker_to_notify(&self.shared) {
1112             self.shared.remotes[index].unpark.unpark(&self.driver);
1113         }
1114     }
1115 
notify_all(&self)1116     pub(super) fn notify_all(&self) {
1117         for remote in &self.shared.remotes[..] {
1118             remote.unpark.unpark(&self.driver);
1119         }
1120     }
1121 
notify_if_work_pending(&self)1122     fn notify_if_work_pending(&self) {
1123         for remote in &self.shared.remotes[..] {
1124             if !remote.steal.is_empty() {
1125                 self.notify_parked_local();
1126                 return;
1127             }
1128         }
1129 
1130         if !self.shared.inject.is_empty() {
1131             self.notify_parked_local();
1132         }
1133     }
1134 
transition_worker_from_searching(&self)1135     fn transition_worker_from_searching(&self) {
1136         if self.shared.idle.transition_worker_from_searching() {
1137             // We are the final searching worker. Because work was found, we
1138             // need to notify another worker.
1139             self.notify_parked_local();
1140         }
1141     }
1142 
1143     /// Signals that a worker has observed the shutdown signal and has replaced
1144     /// its core back into its handle.
1145     ///
1146     /// If all workers have reached this point, the final cleanup is performed.
shutdown_core(&self, core: Box<Core>)1147     fn shutdown_core(&self, core: Box<Core>) {
1148         let mut cores = self.shared.shutdown_cores.lock();
1149         cores.push(core);
1150 
1151         if cores.len() != self.shared.remotes.len() {
1152             return;
1153         }
1154 
1155         debug_assert!(self.shared.owned.is_empty());
1156 
1157         for mut core in cores.drain(..) {
1158             core.shutdown(self);
1159         }
1160 
1161         // Drain the injection queue
1162         //
1163         // We already shut down every task, so we can simply drop the tasks.
1164         while let Some(task) = self.next_remote_task() {
1165             drop(task);
1166         }
1167     }
1168 
ptr_eq(&self, other: &Handle) -> bool1169     fn ptr_eq(&self, other: &Handle) -> bool {
1170         std::ptr::eq(self, other)
1171     }
1172 }
1173 
1174 impl Overflow<Arc<Handle>> for Handle {
push(&self, task: task::Notified<Arc<Handle>>)1175     fn push(&self, task: task::Notified<Arc<Handle>>) {
1176         self.push_remote_task(task);
1177     }
1178 
push_batch<I>(&self, iter: I) where I: Iterator<Item = task::Notified<Arc<Handle>>>,1179     fn push_batch<I>(&self, iter: I)
1180     where
1181         I: Iterator<Item = task::Notified<Arc<Handle>>>,
1182     {
1183         unsafe {
1184             self.shared.inject.push_batch(self, iter);
1185         }
1186     }
1187 }
1188 
1189 pub(crate) struct InjectGuard<'a> {
1190     lock: crate::loom::sync::MutexGuard<'a, Synced>,
1191 }
1192 
1193 impl<'a> AsMut<inject::Synced> for InjectGuard<'a> {
as_mut(&mut self) -> &mut inject::Synced1194     fn as_mut(&mut self) -> &mut inject::Synced {
1195         &mut self.lock.inject
1196     }
1197 }
1198 
1199 impl<'a> Lock<inject::Synced> for &'a Handle {
1200     type Handle = InjectGuard<'a>;
1201 
lock(self) -> Self::Handle1202     fn lock(self) -> Self::Handle {
1203         InjectGuard {
1204             lock: self.shared.synced.lock(),
1205         }
1206     }
1207 }
1208 
1209 #[track_caller]
with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R1210 fn with_current<R>(f: impl FnOnce(Option<&Context>) -> R) -> R {
1211     use scheduler::Context::MultiThread;
1212 
1213     context::with_scheduler(|ctx| match ctx {
1214         Some(MultiThread(ctx)) => f(Some(ctx)),
1215         _ => f(None),
1216     })
1217 }
1218 
1219 // `u32::abs_diff` is not available on Tokio's MSRV.
abs_diff(a: u32, b: u32) -> u321220 fn abs_diff(a: u32, b: u32) -> u32 {
1221     if a > b {
1222         a - b
1223     } else {
1224         b - a
1225     }
1226 }
1227